You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/02/28 18:41:58 UTC
[1/2] hadoop git commit: YARN-6216. Unify Container Resizing code
paths with Container Updates making it scheduler agnostic. (Arun Suresh via
wangda)
Repository: hadoop
Updated Branches:
refs/heads/trunk 480b4dd57 -> eac6b4c35
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index b65f16a..1b20556 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1134,12 +1134,7 @@ public class LeafQueue extends AbstractCSQueue {
if (targetLeafQueue == this) {
// When trying to preempt containers from the same queue
- if (rmContainer.hasIncreaseReservation()) {
- // Increased container reservation
- unreserveIncreasedContainer(clusterResource,
- schedulerContainer.getSchedulerApplicationAttempt(),
- schedulerContainer.getSchedulerNode(), rmContainer);
- } else if (rmContainer.getState() == RMContainerState.RESERVED) {
+ if (rmContainer.getState() == RMContainerState.RESERVED) {
// For other reserved containers
// This is a reservation exchange, complete previous reserved container
completedContainer(clusterResource,
@@ -1212,8 +1207,7 @@ public class LeafQueue extends AbstractCSQueue {
schedulerContainer.getSchedulerApplicationAttempt(),
allocation.getAllocatedOrReservedResource(),
schedulerContainer.getNodePartition(),
- schedulerContainer.getRmContainer(),
- allocation.isIncreasedAllocation());
+ schedulerContainer.getRmContainer());
orderingPolicy.containerAllocated(
schedulerContainer.getSchedulerApplicationAttempt(),
schedulerContainer.getRmContainer());
@@ -1446,40 +1440,6 @@ public class LeafQueue extends AbstractCSQueue {
readLock.unlock();
}
}
-
- @Override
- public void unreserveIncreasedContainer(Resource clusterResource,
- FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
- boolean removed = false;
- Priority priority = null;
-
- try {
- writeLock.lock();
- if (rmContainer.getContainer() != null) {
- priority = rmContainer.getContainer().getPriority();
- }
-
- if (null != priority) {
- removed = app.unreserve(rmContainer.getAllocatedSchedulerKey(), node,
- rmContainer);
- }
-
- if (removed) {
- // Inform the ordering policy
- orderingPolicy.containerReleased(app, rmContainer);
-
- releaseResource(clusterResource, app, rmContainer.getReservedResource(),
- node.getPartition(), rmContainer, true);
- }
- } finally {
- writeLock.unlock();
- }
-
- if (removed) {
- getParent().unreserveIncreasedContainer(clusterResource, app, node,
- rmContainer);
- }
- }
private void updateSchedulerHealthForCompletedContainer(
RMContainer rmContainer, ContainerStatus containerStatus) {
@@ -1538,16 +1498,6 @@ public class LeafQueue extends AbstractCSQueue {
updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
if (application != null) {
- // unreserve container increase request if it previously reserved.
- if (rmContainer.hasIncreaseReservation()) {
- unreserveIncreasedContainer(clusterResource, application, node,
- rmContainer);
- }
-
- // Remove container increase request if it exists
- application.removeIncreaseRequest(node.getNodeID(),
- rmContainer.getAllocatedSchedulerKey(), rmContainer.getContainerId());
-
boolean removed = false;
// Careful! Locking order is important!
@@ -1576,7 +1526,7 @@ public class LeafQueue extends AbstractCSQueue {
orderingPolicy.containerReleased(application, rmContainer);
releaseResource(clusterResource, application, container.getResource(),
- node.getPartition(), rmContainer, false);
+ node.getPartition(), rmContainer);
}
} finally {
writeLock.unlock();
@@ -1597,12 +1547,10 @@ public class LeafQueue extends AbstractCSQueue {
void allocateResource(Resource clusterResource,
SchedulerApplicationAttempt application, Resource resource,
- String nodePartition, RMContainer rmContainer,
- boolean isIncreasedAllocation) {
+ String nodePartition, RMContainer rmContainer) {
try {
writeLock.lock();
- super.allocateResource(clusterResource, resource, nodePartition,
- isIncreasedAllocation);
+ super.allocateResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1643,11 +1591,10 @@ public class LeafQueue extends AbstractCSQueue {
void releaseResource(Resource clusterResource,
FiCaSchedulerApp application, Resource resource, String nodePartition,
- RMContainer rmContainer, boolean isChangeResource) {
+ RMContainer rmContainer) {
try {
writeLock.lock();
- super.releaseResource(clusterResource, resource, nodePartition,
- isChangeResource);
+ super.releaseResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1787,7 +1734,7 @@ public class LeafQueue extends AbstractCSQueue {
rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt,
rmContainer.getContainer().getResource(), node.getPartition(),
- rmContainer, false);
+ rmContainer);
} finally {
writeLock.unlock();
}
@@ -1912,7 +1859,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), node.getPartition(), rmContainer, false);
+ .getResource(), node.getPartition(), rmContainer);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1931,7 +1878,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), node.getPartition(), rmContainer, false);
+ .getResource(), node.getPartition(), rmContainer);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
@@ -2000,85 +1947,6 @@ public class LeafQueue extends AbstractCSQueue {
return defaultAppPriorityPerQueue;
}
- /**
- *
- * @param clusterResource Total cluster resource
- * @param decreaseRequest The decrease request
- * @param app The application of interest
- */
- @Override
- public void decreaseContainer(Resource clusterResource,
- SchedContainerChangeRequest decreaseRequest,
- FiCaSchedulerApp app) throws InvalidResourceRequestException {
- // If the container being decreased is reserved, we need to unreserve it
- // first.
- RMContainer rmContainer = decreaseRequest.getRMContainer();
- if (rmContainer.hasIncreaseReservation()) {
- unreserveIncreasedContainer(clusterResource, app,
- (FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer);
- }
- boolean resourceDecreased = false;
- Resource resourceBeforeDecrease;
- // Grab queue lock to avoid race condition when getting container resource
-
- try {
- writeLock.lock();
- // Make sure the decrease request is valid in terms of current resource
- // and target resource. This must be done under the leaf queue lock.
- // Throws exception if the check fails.
- RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false);
- // Save resource before decrease for debug log
- resourceBeforeDecrease = Resources.clone(
- rmContainer.getAllocatedResource());
- // Do we have increase request for the same container? If so, remove it
- boolean hasIncreaseRequest = app.removeIncreaseRequest(
- decreaseRequest.getNodeId(),
- decreaseRequest.getRMContainer().getAllocatedSchedulerKey(),
- decreaseRequest.getContainerId());
- if (hasIncreaseRequest) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("While processing decrease requests, found an increase"
- + " request for the same container " + decreaseRequest
- .getContainerId() + ", removed the increase request");
- }
- }
- // Delta capacity is negative when it's a decrease request
- Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
- if (Resources.equals(absDelta, Resources.none())) {
- // If delta capacity of this decrease request is 0, this decrease
- // request serves the purpose of cancelling an existing increase request
- // if any
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decrease target resource equals to existing resource for"
- + " container:" + decreaseRequest.getContainerId()
- + " ignore this decrease request.");
- }
- } else{
- // Release the delta resource
- releaseResource(clusterResource, app, absDelta,
- decreaseRequest.getNodePartition(),
- decreaseRequest.getRMContainer(), true);
- // Notify application
- app.decreaseContainer(decreaseRequest);
- // Notify node
- decreaseRequest.getSchedulerNode().decreaseContainer(
- decreaseRequest.getContainerId(), absDelta);
- resourceDecreased = true;
- }
- } finally {
- writeLock.unlock();
- }
-
- if (resourceDecreased) {
- // Notify parent queue outside of leaf queue lock
- getParent().decreaseContainer(clusterResource, decreaseRequest, app);
- LOG.info("Application attempt " + app.getApplicationAttemptId()
- + " decreased container:" + decreaseRequest.getContainerId()
- + " from " + resourceBeforeDecrease + " to "
- + decreaseRequest.getTargetCapacity());
- }
- }
-
public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
Priority newAppPriority) {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 9c42c61..6f82fcc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -773,12 +773,11 @@ public class ParentQueue extends AbstractCSQueue {
}
private void internalReleaseResource(Resource clusterResource,
- FiCaSchedulerNode node, Resource releasedResource,
- boolean changeResource) {
+ FiCaSchedulerNode node, Resource releasedResource) {
try {
writeLock.lock();
super.releaseResource(clusterResource, releasedResource,
- node.getPartition(), changeResource);
+ node.getPartition());
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -789,38 +788,6 @@ public class ParentQueue extends AbstractCSQueue {
writeLock.unlock();
}
}
-
- @Override
- public void decreaseContainer(Resource clusterResource,
- SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app)
- throws InvalidResourceRequestException {
- // delta capacity is negative when it's a decrease request
- Resource absDeltaCapacity =
- Resources.negate(decreaseRequest.getDeltaCapacity());
-
- internalReleaseResource(clusterResource,
- csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false);
-
- // Inform the parent
- if (parent != null) {
- parent.decreaseContainer(clusterResource, decreaseRequest, app);
- }
- }
-
- @Override
- public void unreserveIncreasedContainer(Resource clusterResource,
- FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
- if (app != null) {
- internalReleaseResource(clusterResource, node,
- rmContainer.getReservedResource(), false);
-
- // Inform the parent
- if (parent != null) {
- parent.unreserveIncreasedContainer(clusterResource, app, node,
- rmContainer);
- }
- }
- }
@Override
public void completedContainer(Resource clusterResource,
@@ -830,7 +797,7 @@ public class ParentQueue extends AbstractCSQueue {
boolean sortQueues) {
if (application != null) {
internalReleaseResource(clusterResource, node,
- rmContainer.getContainer().getResource(), false);
+ rmContainer.getContainer().getResource());
// Inform the parent
if (parent != null) {
@@ -886,7 +853,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node = scheduler.getNode(
rmContainer.getContainer().getNodeId());
allocateResource(clusterResource,
- rmContainer.getContainer().getResource(), node.getPartition(), false);
+ rmContainer.getContainer().getResource(), node.getPartition());
} finally {
writeLock.unlock();
}
@@ -923,7 +890,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, rmContainer.getContainer()
- .getResource(), node.getPartition(), false);
+ .getResource(), node.getPartition());
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@@ -943,7 +910,7 @@ public class ParentQueue extends AbstractCSQueue {
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.releaseResource(clusterResource,
rmContainer.getContainer().getResource(),
- node.getPartition(), false);
+ node.getPartition());
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@@ -960,11 +927,10 @@ public class ParentQueue extends AbstractCSQueue {
}
void allocateResource(Resource clusterResource,
- Resource resource, String nodePartition, boolean changeContainerResource) {
+ Resource resource, String nodePartition) {
try {
writeLock.lock();
- super.allocateResource(clusterResource, resource, nodePartition,
- changeContainerResource);
+ super.allocateResource(clusterResource, resource, nodePartition);
/**
* check if we need to kill (killable) containers if maximum resource violated.
@@ -1054,8 +1020,7 @@ public class ParentQueue extends AbstractCSQueue {
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(cluster, allocation.getAllocatedOrReservedResource(),
- schedulerContainer.getNodePartition(),
- allocation.isIncreasedAllocation());
+ schedulerContainer.getNodePartition());
LOG.info("assignedContainer" + " queue=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
index 57188d8..4879fae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
public class ContainerAllocator extends AbstractContainerAllocator {
- private AbstractContainerAllocator increaseContainerAllocator;
private AbstractContainerAllocator regularContainerAllocator;
public ContainerAllocator(FiCaSchedulerApp application,
@@ -45,8 +44,6 @@ public class ContainerAllocator extends AbstractContainerAllocator {
RMContext rmContext, ActivitiesManager activitiesManager) {
super(application, rc, rmContext);
- increaseContainerAllocator =
- new IncreaseContainerAllocator(application, rc, rmContext);
regularContainerAllocator = new RegularContainerAllocator(application, rc,
rmContext, activitiesManager);
}
@@ -55,32 +52,8 @@ public class ContainerAllocator extends AbstractContainerAllocator {
public CSAssignment assignContainers(Resource clusterResource,
PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, RMContainer reservedContainer) {
- if (reservedContainer != null) {
- if (reservedContainer.getState() == RMContainerState.RESERVED) {
- // It's a regular container
- return regularContainerAllocator.assignContainers(clusterResource,
- ps, schedulingMode, resourceLimits, reservedContainer);
- } else {
- // It's a increase container
- return increaseContainerAllocator.assignContainers(clusterResource,
- ps, schedulingMode, resourceLimits, reservedContainer);
- }
- } else {
- /*
- * Try to allocate increase container first, and if we failed to allocate
- * anything, we will try to allocate regular container
- */
- CSAssignment assign =
- increaseContainerAllocator.assignContainers(clusterResource, ps,
- schedulingMode, resourceLimits, null);
- if (Resources.greaterThan(rc, clusterResource, assign.getResource(),
- Resources.none())) {
- return assign;
- }
-
- return regularContainerAllocator.assignContainers(clusterResource, ps,
- schedulingMode, resourceLimits, null);
- }
+ return regularContainerAllocator.assignContainers(clusterResource,
+ ps, schedulingMode, resourceLimits, reservedContainer);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
deleted file mode 100644
index 0dc527f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class IncreaseContainerAllocator extends AbstractContainerAllocator {
- private static final Log LOG =
- LogFactory.getLog(IncreaseContainerAllocator.class);
-
- public IncreaseContainerAllocator(FiCaSchedulerApp application,
- ResourceCalculator rc, RMContext rmContext) {
- super(application, rc, rmContext);
- }
-
- /**
- * Quick check if we can allocate anything here:
- * We will not continue if:
- * - Headroom doesn't support allocate minimumAllocation
- * -
- */
- private boolean checkHeadroom(Resource clusterResource,
- ResourceLimits currentResourceLimits, Resource required) {
- return Resources.greaterThanOrEqual(rc, clusterResource,
- currentResourceLimits.getHeadroom(), required);
- }
-
- private CSAssignment createReservedIncreasedCSAssignment(
- SchedContainerChangeRequest request) {
- CSAssignment assignment =
- new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
- application, CSAssignment.SkippedType.NONE, false);
- Resources.addTo(assignment.getAssignmentInformation().getReserved(),
- request.getDeltaCapacity());
- assignment.getAssignmentInformation().incrReservations();
- assignment.getAssignmentInformation().addReservationDetails(
- request.getRMContainer(), application.getCSLeafQueue().getQueuePath());
- assignment.setIncreasedAllocation(true);
-
- LOG.info("Reserved increase container request:" + request.toString());
-
- return assignment;
- }
-
- private CSAssignment createSuccessfullyIncreasedCSAssignment(
- SchedContainerChangeRequest request, boolean fromReservation) {
- CSAssignment assignment =
- new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
- application, CSAssignment.SkippedType.NONE, fromReservation);
- Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
- request.getDeltaCapacity());
- assignment.getAssignmentInformation().incrAllocations();
- assignment.getAssignmentInformation().addAllocationDetails(
- request.getRMContainer(), application.getCSLeafQueue().getQueuePath());
- assignment.setIncreasedAllocation(true);
-
- if (fromReservation) {
- assignment.setFulfilledReservedContainer(request.getRMContainer());
- }
-
- // notify application
- application
- .getCSLeafQueue()
- .getOrderingPolicy()
- .containerAllocated(application,
- application.getRMContainer(request.getContainerId()));
-
- LOG.info("Approved increase container request:" + request.toString()
- + " fromReservation=" + fromReservation);
-
- return assignment;
- }
-
- private CSAssignment allocateIncreaseRequestFromReservedContainer(
- SchedulerNode node, Resource cluster,
- SchedContainerChangeRequest increaseRequest) {
- if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(),
- node.getUnallocatedResource())) {
- return createSuccessfullyIncreasedCSAssignment(increaseRequest, true);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to allocate reserved increase request:"
- + increaseRequest.toString()
- + ". There's no enough available resource");
- }
-
- // We still cannot allocate this container, will wait for next turn
- return CSAssignment.SKIP_ASSIGNMENT;
- }
- }
-
- private CSAssignment allocateIncreaseRequest(FiCaSchedulerNode node,
- Resource cluster, SchedContainerChangeRequest increaseRequest) {
- if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(),
- node.getUnallocatedResource())) {
- return createSuccessfullyIncreasedCSAssignment(increaseRequest, false);
- } else{
- // We cannot allocate this container, but since queue capacity /
- // user-limit matches, we can reserve this container on this node.
- return createReservedIncreasedCSAssignment(increaseRequest);
- }
- }
-
- @Override
- public CSAssignment assignContainers(Resource clusterResource,
- PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode,
- ResourceLimits resourceLimits, RMContainer reservedContainer) {
- AppSchedulingInfo sinfo = application.getAppSchedulingInfo();
- FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
-
- if (null == node) {
- // This is global scheduling enabled
- // FIXME, support container increase when global scheduling enabled
- return CSAssignment.SKIP_ASSIGNMENT;
- }
- NodeId nodeId = node.getNodeID();
-
- if (reservedContainer == null) {
- // Do we have increase request on this node?
- if (!sinfo.hasIncreaseRequest(nodeId)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skip allocating increase request since we don't have any"
- + " increase request on this node=" + node.getNodeID());
- }
-
- return CSAssignment.SKIP_ASSIGNMENT;
- }
-
- // Check if we need to unreserve something, note that we don't support
- // continuousReservationLooking now. TODO, need think more about how to
- // support it.
- boolean shouldUnreserve =
- Resources.greaterThan(rc, clusterResource,
- resourceLimits.getAmountNeededUnreserve(), Resources.none());
-
- // Check if we can allocate minimum resource according to headroom
- boolean cannotAllocateAnything =
- !checkHeadroom(clusterResource, resourceLimits, rmContext
- .getScheduler().getMinimumResourceCapability());
-
- // Skip the app if we failed either of above check
- if (cannotAllocateAnything || shouldUnreserve) {
- if (LOG.isDebugEnabled()) {
- if (shouldUnreserve) {
- LOG.debug("Cannot continue since we have to unreserve some resource"
- + ", now increase container allocation doesn't "
- + "support continuous reservation looking..");
- }
- if (cannotAllocateAnything) {
- LOG.debug("We cannot allocate anything because of low headroom, "
- + "headroom=" + resourceLimits.getHeadroom());
- }
- }
-
- return CSAssignment.SKIP_ASSIGNMENT;
- }
-
- CSAssignment assigned = null;
-
- /*
- * Loop each priority, and containerId. Container priority is not
- * equivalent to request priority, application master can run an important
- * task on a less prioritized container.
- *
- * So behavior here is, we still try to increase container with higher
- * priority, but will skip increase request and move to next increase
- * request if queue-limit or user-limit aren't satisfied
- */
- for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Looking at increase request for application="
- + application.getApplicationAttemptId() + " priority="
- + schedulerKey.getPriority());
- }
-
- /*
- * If we have multiple to-be-increased containers under same priority on
- * a same host, we will try to increase earlier launched container
- * first. And again - we will skip a request and move to next if it
- * cannot be allocated.
- */
- Map<ContainerId, SchedContainerChangeRequest> increaseRequestMap =
- sinfo.getIncreaseRequests(nodeId, schedulerKey);
-
- // We don't have more increase request on this priority, skip..
- if (null == increaseRequestMap) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("There's no increase request for "
- + application.getApplicationAttemptId() + " priority="
- + schedulerKey.getPriority());
- }
- continue;
- }
- Iterator<Entry<ContainerId, SchedContainerChangeRequest>> iter =
- increaseRequestMap.entrySet().iterator();
-
- while (iter.hasNext()) {
- Entry<ContainerId, SchedContainerChangeRequest> entry =
- iter.next();
- SchedContainerChangeRequest increaseRequest =
- entry.getValue();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Looking at increase request=" + increaseRequest.toString());
- }
-
- boolean headroomSatisifed = checkHeadroom(clusterResource,
- resourceLimits, increaseRequest.getDeltaCapacity());
- if (!headroomSatisifed) {
- // skip if doesn't satisfy headroom limit
- if (LOG.isDebugEnabled()) {
- LOG.debug(" Headroom is not satisfied, skip..");
- }
- continue;
- }
-
- RMContainer rmContainer = increaseRequest.getRMContainer();
- if (rmContainer.getContainerState() != ContainerState.RUNNING) {
- // if the container is not running, we should remove the
- // increaseRequest and continue;
- if (LOG.isDebugEnabled()) {
- LOG.debug(" Container is not running any more, skip...");
- }
- application.addToBeRemovedIncreaseRequest(increaseRequest);
- continue;
- }
-
- if (!Resources.fitsIn(rc, clusterResource,
- increaseRequest.getTargetCapacity(), node.getTotalResource())) {
- // if the target capacity is more than what the node can offer, we
- // will simply remove and skip it.
- // The reason of doing check here instead of adding increase request
- // to scheduler because node's resource could be updated after
- // request added.
- if (LOG.isDebugEnabled()) {
- LOG.debug(" Target capacity is more than what node can offer,"
- + " node.resource=" + node.getTotalResource());
- }
- application.addToBeRemovedIncreaseRequest(increaseRequest);
- continue;
- }
-
- // Try to allocate the increase request
- assigned =
- allocateIncreaseRequest(node, clusterResource, increaseRequest);
- if (assigned.getSkippedType()
- == CSAssignment.SkippedType.NONE) {
- // When we don't skip this request, which means we either allocated
- // OR reserved this request. We will break
- break;
- }
- }
-
- // We may have allocated something
- if (assigned != null && assigned.getSkippedType()
- == CSAssignment.SkippedType.NONE) {
- break;
- }
- }
-
- return assigned == null ? CSAssignment.SKIP_ASSIGNMENT : assigned;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to allocate reserved increase container request..");
- }
-
- // We already reserved this increase container
- SchedContainerChangeRequest request =
- sinfo.getIncreaseRequest(nodeId,
- reservedContainer.getAllocatedSchedulerKey(),
- reservedContainer.getContainerId());
-
- // We will cancel the reservation any of following happens
- // - Container finished
- // - No increase request needed
- // - Target resource updated
- if (null == request
- || reservedContainer.getContainerState() != ContainerState.RUNNING
- || (!Resources.equals(reservedContainer.getReservedResource(),
- request.getDeltaCapacity()))) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("We don't need reserved increase container request "
- + "for container=" + reservedContainer.getContainerId()
- + ". Unreserving and return...");
- }
-
- // We don't need this container now, just return excessive reservation
- return new CSAssignment(application, reservedContainer);
- }
-
- return allocateIncreaseRequestFromReservedContainer(node, clusterResource,
- request);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
index ac83d6f..2921e7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java
@@ -43,8 +43,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
// not be included by toRelease list
private SchedulerContainer<A, N> allocateFromReservedContainer;
- private boolean isIncreasedAllocation;
-
private NodeType allocationLocalityType;
private NodeType requestLocalityType;
@@ -57,7 +55,7 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
SchedulerContainer<A, N> allocatedOrReservedContainer,
List<SchedulerContainer<A, N>> toRelease,
SchedulerContainer<A, N> allocateFromReservedContainer,
- boolean isIncreasedAllocation, NodeType allocationLocalityType,
+ NodeType allocationLocalityType,
NodeType requestLocalityType, SchedulingMode schedulingMode,
Resource allocatedResource) {
this.allocatedOrReservedContainer = allocatedOrReservedContainer;
@@ -65,7 +63,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
this.toRelease = toRelease;
}
this.allocateFromReservedContainer = allocateFromReservedContainer;
- this.isIncreasedAllocation = isIncreasedAllocation;
this.allocationLocalityType = allocationLocalityType;
this.requestLocalityType = requestLocalityType;
this.schedulingMode = schedulingMode;
@@ -84,10 +81,6 @@ public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt,
return allocationLocalityType;
}
- public boolean isIncreasedAllocation() {
- return isIncreasedAllocation;
- }
-
public SchedulerContainer<A, N> getAllocateFromReservedContainer() {
return allocateFromReservedContainer;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 30b7305..fea29bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -312,54 +312,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return false;
}
- private SchedContainerChangeRequest getResourceChangeRequest(
- SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
- return appSchedulingInfo.getIncreaseRequest(
- schedulerContainer.getSchedulerNode().getNodeID(),
- schedulerContainer.getSchedulerRequestKey(),
- schedulerContainer.getRmContainer().getContainerId());
- }
-
- private boolean checkIncreaseContainerAllocation(
- ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation,
- SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
- // When increase a container
- if (schedulerContainer.getRmContainer().getState()
- != RMContainerState.RUNNING) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to increase a container, but container="
- + schedulerContainer.getRmContainer().getContainerId()
- + " is not in running state.");
- }
- return false;
- }
-
- // Check if increase request is still valid
- SchedContainerChangeRequest increaseRequest = getResourceChangeRequest(
- schedulerContainer);
-
- if (null == increaseRequest || !Resources.equals(
- increaseRequest.getDeltaCapacity(),
- allocation.getAllocatedOrReservedResource())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Increase request has been changed, reject this proposal");
- }
- return false;
- }
-
- if (allocation.getAllocateFromReservedContainer() != null) {
- // In addition, if allocation is from a reserved container, check
- // if the reserved container has enough reserved space
- if (!Resources.equals(
- allocation.getAllocateFromReservedContainer().getRmContainer()
- .getReservedResource(), increaseRequest.getDeltaCapacity())) {
- return false;
- }
- }
-
- return true;
- }
-
private boolean commonCheckContainerAllocation(
Resource cluster,
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation,
@@ -445,30 +397,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
schedulerContainer = allocation.getAllocatedOrReservedContainer();
if (schedulerContainer.isAllocated()) {
- if (!allocation.isIncreasedAllocation()) {
- // When allocate a new container
- resourceRequests =
- schedulerContainer.getRmContainer().getResourceRequests();
-
- // Check pending resource request
- if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(),
- schedulerContainer.getSchedulerNode(),
- schedulerContainer.getSchedulerRequestKey())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No pending resource for: nodeType=" + allocation
- .getAllocationLocalityType() + ", node=" + schedulerContainer
- .getSchedulerNode() + ", requestKey=" + schedulerContainer
- .getSchedulerRequestKey() + ", application="
- + getApplicationAttemptId());
- }
-
- return false;
- }
- } else {
- if (!checkIncreaseContainerAllocation(allocation,
- schedulerContainer)) {
- return false;
+ // When allocate a new container
+ resourceRequests =
+ schedulerContainer.getRmContainer().getResourceRequests();
+
+ // Check pending resource request
+ if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(),
+ schedulerContainer.getSchedulerNode(),
+ schedulerContainer.getSchedulerRequestKey())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No pending resource for: nodeType=" + allocation
+ .getAllocationLocalityType() + ", node=" + schedulerContainer
+ .getSchedulerNode() + ", requestKey=" + schedulerContainer
+ .getSchedulerRequestKey() + ", application="
+ + getApplicationAttemptId());
}
+
+ return false;
}
// Common part of check container allocation regardless if it is a
@@ -541,12 +486,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Generate new containerId if it is not an allocation for increasing
// Or re-reservation
- if (!allocation.isIncreasedAllocation()) {
- if (rmContainer.getContainer().getId() == null) {
- rmContainer.setContainerId(BuilderUtils
- .newContainerId(getApplicationAttemptId(),
- getNewContainerId()));
- }
+ if (rmContainer.getContainer().getId() == null) {
+ rmContainer.setContainerId(BuilderUtils
+ .newContainerId(getApplicationAttemptId(),
+ getNewContainerId()));
}
ContainerId containerId = rmContainer.getContainerId();
@@ -562,77 +505,50 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
schedulerContainer.getSchedulerNode(), reservedContainer);
}
- // Update this application for the allocated container
- if (!allocation.isIncreasedAllocation()) {
- // Allocate a new container
- addToNewlyAllocatedContainers(
- schedulerContainer.getSchedulerNode(), rmContainer);
- liveContainers.put(containerId, rmContainer);
+ // Allocate a new container
+ addToNewlyAllocatedContainers(
+ schedulerContainer.getSchedulerNode(), rmContainer);
+ liveContainers.put(containerId, rmContainer);
- // Deduct pending resource requests
- List<ResourceRequest> requests = appSchedulingInfo.allocate(
- allocation.getAllocationLocalityType(),
- schedulerContainer.getSchedulerNode(),
- schedulerContainer.getSchedulerRequestKey(),
- schedulerContainer.getRmContainer().getContainer());
- ((RMContainerImpl) rmContainer).setResourceRequests(requests);
+ // Deduct pending resource requests
+ List<ResourceRequest> requests = appSchedulingInfo.allocate(
+ allocation.getAllocationLocalityType(),
+ schedulerContainer.getSchedulerNode(),
+ schedulerContainer.getSchedulerRequestKey(),
+ schedulerContainer.getRmContainer().getContainer());
+ ((RMContainerImpl) rmContainer).setResourceRequests(requests);
- attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
- allocation.getAllocatedOrReservedResource());
+ attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
+ allocation.getAllocatedOrReservedResource());
- rmContainer.handle(
- new RMContainerEvent(containerId, RMContainerEventType.START));
+ rmContainer.handle(
+ new RMContainerEvent(containerId, RMContainerEventType.START));
- // Inform the node
- schedulerContainer.getSchedulerNode().allocateContainer(
- rmContainer);
+ // Inform the node
+ schedulerContainer.getSchedulerNode().allocateContainer(
+ rmContainer);
- // update locality statistics,
- incNumAllocatedContainers(allocation.getAllocationLocalityType(),
- allocation.getRequestLocalityType());
+ // update locality statistics,
+ incNumAllocatedContainers(allocation.getAllocationLocalityType(),
+ allocation.getRequestLocalityType());
- if (LOG.isDebugEnabled()) {
- LOG.debug("allocate: applicationAttemptId=" + containerId
- .getApplicationAttemptId() + " container=" + containerId
- + " host=" + rmContainer.getAllocatedNode().getHost()
- + " type=" + allocation.getAllocationLocalityType());
- }
- RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
- "SchedulerApp", getApplicationId(), containerId,
- allocation.getAllocatedOrReservedResource());
- } else{
- SchedContainerChangeRequest increaseRequest =
- getResourceChangeRequest(schedulerContainer);
-
- // allocate resource for an increase request
- // Notify node
- schedulerContainer.getSchedulerNode().increaseContainer(
- increaseRequest.getContainerId(),
- increaseRequest.getDeltaCapacity());
-
- // OK, we can allocate this increase request
- // Notify application
- increaseContainer(increaseRequest);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allocate: applicationAttemptId=" + containerId
+ .getApplicationAttemptId() + " container=" + containerId
+ + " host=" + rmContainer.getAllocatedNode().getHost()
+ + " type=" + allocation.getAllocationLocalityType());
}
+ RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER,
+ "SchedulerApp", getApplicationId(), containerId,
+ allocation.getAllocatedOrReservedResource());
} else {
- if (!allocation.isIncreasedAllocation()) {
- // If the rmContainer's state is already updated to RESERVED, this is
- // a reReservation
- reserve(schedulerContainer.getSchedulerRequestKey(),
- schedulerContainer.getSchedulerNode(),
- schedulerContainer.getRmContainer(),
- schedulerContainer.getRmContainer().getContainer(),
- reReservation);
- } else{
- SchedContainerChangeRequest increaseRequest =
- getResourceChangeRequest(schedulerContainer);
-
- reserveIncreasedContainer(
- schedulerContainer.getSchedulerRequestKey(),
- schedulerContainer.getSchedulerNode(),
- increaseRequest.getRMContainer(),
- increaseRequest.getDeltaCapacity());
- }
+ // If the rmContainer's state is already updated to RESERVED, this is
+ // a reReservation
+ reserve(schedulerContainer.getSchedulerRequestKey(),
+ schedulerContainer.getSchedulerNode(),
+ schedulerContainer.getRmContainer(),
+ schedulerContainer.getRmContainer().getContainer(),
+ reReservation);
}
}
} finally {
@@ -649,9 +565,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
FiCaSchedulerNode node, RMContainer rmContainer) {
try {
writeLock.lock();
- // Cancel increase request (if it has reserved increase request
- rmContainer.cancelIncreaseReservation();
-
// Done with the reservation?
if (internalUnreserve(node, schedulerKey)) {
node.unreserveResource(this);
@@ -807,13 +720,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
.entrySet()) {
NodeId nodeId = entry.getKey();
RMContainer reservedContainer = entry.getValue();
- if (reservedContainer.hasIncreaseReservation()) {
- // Currently, only regular container allocation supports continuous
- // reservation looking, we don't support canceling increase request
- // reservation when allocating regular container.
- continue;
- }
-
Resource reservedResource = reservedContainer.getReservedResource();
// make sure we unreserve one with at least the same amount of
@@ -869,25 +775,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
}
- public boolean reserveIncreasedContainer(SchedulerRequestKey schedulerKey,
- FiCaSchedulerNode node,
- RMContainer rmContainer, Resource reservedResource) {
- // Inform the application
- if (super.reserveIncreasedContainer(node, schedulerKey, rmContainer,
- reservedResource)) {
-
- queue.getMetrics().reserveResource(getUser(), reservedResource);
-
- // Update the node
- node.reserveResource(this, schedulerKey, rmContainer);
-
- // Succeeded
- return true;
- }
-
- return false;
- }
-
public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node,
RMContainer rmContainer, Container container, boolean reReservation) {
// Update reserved metrics if this is the first reservation
@@ -1114,26 +1001,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
toBeRemovedIncRequests.put(request.getContainerId(), request);
}
- public void removedToBeRemovedIncreaseRequests() {
- // Remove invalid in request requests
- if (!toBeRemovedIncRequests.isEmpty()) {
- try {
- writeLock.lock();
- Iterator<Map.Entry<ContainerId, SchedContainerChangeRequest>> iter =
- toBeRemovedIncRequests.entrySet().iterator();
- while (iter.hasNext()) {
- SchedContainerChangeRequest req = iter.next().getValue();
- appSchedulingInfo.removeIncreaseRequest(req.getNodeId(),
- req.getRMContainer().getAllocatedSchedulerKey(),
- req.getContainerId());
- iter.remove();
- }
- } finally {
- writeLock.unlock();
- }
- }
- }
-
/*
* Overriding to appease findbugs
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 8eac929..c26a11b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -153,20 +153,6 @@ public class FiCaSchedulerNode extends SchedulerNode {
}
}
- @Override
- protected synchronized void changeContainerResource(ContainerId containerId,
- Resource deltaResource, boolean increase) {
- super.changeContainerResource(containerId, deltaResource, increase);
-
- if (killableContainers.containsKey(containerId)) {
- if (increase) {
- Resources.addTo(totalKillableResources, deltaResource);
- } else {
- Resources.subtractFrom(totalKillableResources, deltaResource);
- }
- }
- }
-
public synchronized Resource getTotalKillableResources() {
return totalKillableResources;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 3246778..0599414 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -819,9 +819,7 @@ public class FairScheduler extends
}
// Handle promotions and demotions
- handleExecutionTypeUpdates(
- application, updateRequests.getPromotionRequests(),
- updateRequests.getDemotionRequests());
+ handleContainerUpdates(application, updateRequests);
// Sanity check
normalizeRequests(ask);
@@ -1769,13 +1767,6 @@ public class FairScheduler extends
return targetQueueName;
}
- @Override
- protected void decreaseContainer(
- SchedContainerChangeRequest decreaseRequest,
- SchedulerApplicationAttempt attempt) {
- // TODO Auto-generated method stub
- }
-
public float getReservableNodesRatio() {
return reservableNodesRatio;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 64dbc7d..a8d4f48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -937,14 +937,6 @@ public class FifoScheduler extends
}
@Override
- protected void decreaseContainer(
- SchedContainerChangeRequest decreaseRequest,
- SchedulerApplicationAttempt attempt) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
protected synchronized void nodeUpdate(RMNode nm) {
super.nodeUpdate(nm);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 899523c..e34665d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -137,11 +137,11 @@ public class TestChildQueueOrder {
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource,
- allocatedResource, RMNodeLabelsManager.NO_LABEL, false);
+ allocatedResource, RMNodeLabelsManager.NO_LABEL);
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
- allocatedResource, null, null, false);
+ allocatedResource, null, null);
}
// Next call - nothing
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index 0696f57..b4b05ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -88,18 +88,6 @@ public class TestContainerResizing {
}
@Override
- protected void decreaseContainers(
- List<UpdateContainerRequest> decreaseRequests,
- SchedulerApplicationAttempt attempt) {
- try {
- Thread.sleep(1000);
- } catch(InterruptedException e) {
- LOG.debug("Thread interrupted.");
- }
- super.decreaseContainers(decreaseRequests, attempt);
- }
-
- @Override
public CSAssignment allocateContainersToNode(
PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
try {
@@ -288,13 +276,9 @@ public class TestContainerResizing {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-
- RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
/* Check reservation statuses */
// Increase request should be reserved
- Assert.assertTrue(rmContainer1.hasIncreaseReservation());
- Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize());
Assert.assertFalse(app.getReservedContainers().isEmpty());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied
@@ -319,7 +303,6 @@ public class TestContainerResizing {
/* Check statuses after reservation satisfied */
// Increase request should be unreserved
- Assert.assertFalse(rmContainer1.hasIncreaseReservation());
Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will be changed since it's satisfied
@@ -391,11 +374,8 @@ public class TestContainerResizing {
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId2);
-
/* Check reservation statuses */
// Increase request should *NOT* be reserved as it exceeds user limit
- Assert.assertFalse(rmContainer1.hasIncreaseReservation());
Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied
@@ -471,13 +451,9 @@ public class TestContainerResizing {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-
- RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
/* Check reservation statuses */
// Increase request should be reserved
- Assert.assertTrue(rmContainer1.hasIncreaseReservation());
- Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize());
Assert.assertFalse(app.getReservedContainers().isEmpty());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied
@@ -510,7 +486,6 @@ public class TestContainerResizing {
// Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
- Assert.assertFalse(rmContainer1.hasIncreaseReservation());
// Pending resource will be changed since it's satisfied
checkPendingResource(rm1, "default", 0 * GB, null);
Assert.assertEquals(0 * GB,
@@ -585,13 +560,9 @@ public class TestContainerResizing {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-
- RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
/* Check reservation statuses */
// Increase request should be reserved
- Assert.assertTrue(rmContainer1.hasIncreaseReservation());
- Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemorySize());
Assert.assertFalse(app.getReservedContainers().isEmpty());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied
@@ -614,7 +585,7 @@ public class TestContainerResizing {
am1.sendContainerResizingRequest(Arrays.asList(
UpdateContainerRequest
.newInstance(0, containerId1,
- ContainerUpdateType.INCREASE_RESOURCE,
+ ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(1 * GB), null)));
// Trigger a node heartbeat..
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
@@ -623,7 +594,6 @@ public class TestContainerResizing {
// Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
- Assert.assertFalse(rmContainer1.hasIncreaseReservation());
// Pending resource will be changed since it's satisfied
checkPendingResource(rm1, "default", 0 * GB, null);
Assert.assertEquals(0 * GB,
@@ -698,12 +668,8 @@ public class TestContainerResizing {
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2);
-
/* Check reservation statuses */
// Increase request should be reserved
- Assert.assertTrue(rmContainer2.hasIncreaseReservation());
- Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemorySize());
Assert.assertFalse(app.getReservedContainers().isEmpty());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied
@@ -721,12 +687,13 @@ public class TestContainerResizing {
// Complete container2, container will be unreserved and completed
am1.allocate(null, Arrays.asList(containerId2));
-
+
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ am1.allocate(null, null);
/* Check statuses after reservation satisfied */
// Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
- Assert.assertFalse(rmContainer2.hasIncreaseReservation());
// Pending resource will be changed since it's satisfied
checkPendingResource(rm1, "default", 0 * GB, null);
Assert.assertEquals(0 * GB,
@@ -796,12 +763,8 @@ public class TestContainerResizing {
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2);
-
/* Check reservation statuses */
// Increase request should be reserved
- Assert.assertTrue(rmContainer2.hasIncreaseReservation());
- Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemorySize());
Assert.assertFalse(app.getReservedContainers().isEmpty());
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Pending resource will not be changed since it's not satisfied
@@ -825,11 +788,11 @@ public class TestContainerResizing {
// Increase request should be unreserved
Assert.assertTrue(app.getReservedContainers().isEmpty());
Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
- Assert.assertFalse(rmContainer2.hasIncreaseReservation());
// Pending resource will be changed since it's satisfied
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
checkPendingResource(rm1, "default", 0 * GB, null);
- Assert.assertEquals(0 * GB,
- app.getAppAttemptResourceUsage().getPending().getMemorySize());
+
// Queue/user/application's usage will be updated
checkUsedResource(rm1, "default", 0 * GB, null);
// User will be removed
@@ -949,89 +912,6 @@ public class TestContainerResizing {
rm1.close();
}
- @Test
- public void testIncreaseContainerRequestGetPreferrence()
- throws Exception {
- /**
- * There're multiple containers need to be increased, and there're several
- * container allocation request, scheduler will try to increase container
- * before allocate new containers
- */
- MockRM rm1 = new MockRM() {
- @Override
- public RMNodeLabelsManager createNodeLabelManager() {
- return mgr;
- }
- };
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
-
- // app1 -> a1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- FiCaSchedulerApp app = TestUtils.getFiCaSchedulerApp(
- rm1, app1.getApplicationId());
- ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
-
- // Container 2, 3 (priority=3)
- allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 3, 2);
-
- // Container 4, 5 (priority=2)
- allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 2, 4);
-
- // Container 6, 7 (priority=4)
- allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6);
-
- // am1 asks to change its container[2-7] from 1G to 2G
- List<UpdateContainerRequest> increaseRequests = new ArrayList<>();
- for (int cId = 2; cId <= 7; cId++) {
- ContainerId containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), cId);
- increaseRequests.add(UpdateContainerRequest
- .newInstance(0, containerId,
- ContainerUpdateType.INCREASE_RESOURCE,
- Resources.createResource(2 * GB), null));
- }
- am1.sendContainerResizingRequest(increaseRequests);
-
- checkPendingResource(rm1, "default", 6 * GB, null);
- Assert.assertEquals(6 * GB,
- app.getAppAttemptResourceUsage().getPending().getMemorySize());
-
- // Get rmNode1
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
- // assignContainer, container-4/5/2 increased (which has highest priority OR
- // earlier allocated)
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- AllocateResponse allocateResponse = am1.allocate(null, null);
- Assert.assertEquals(3, allocateResponse.getUpdatedContainers().size());
- verifyContainerIncreased(allocateResponse,
- ContainerId.newContainerId(attemptId, 4), 2 * GB);
- verifyContainerIncreased(allocateResponse,
- ContainerId.newContainerId(attemptId, 5), 2 * GB);
- verifyContainerIncreased(allocateResponse,
- ContainerId.newContainerId(attemptId, 2), 2 * GB);
-
- /* Check statuses after allocation */
- // There're still 3 pending increase requests
- checkPendingResource(rm1, "default", 3 * GB, null);
- Assert.assertEquals(3 * GB,
- app.getAppAttemptResourceUsage().getPending().getMemorySize());
- // Queue/user/application's usage will be updated
- checkUsedResource(rm1, "default", 10 * GB, null);
- Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default"))
- .getUser("user").getUsed().getMemorySize());
- Assert.assertEquals(0 * GB,
- app.getAppAttemptResourceUsage().getReserved().getMemorySize());
- Assert.assertEquals(10 * GB,
- app.getAppAttemptResourceUsage().getUsed().getMemorySize());
-
- rm1.close();
- }
-
@Test (timeout = 60000)
public void testDecreaseContainerWillNotDeadlockContainerAllocation()
throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
index 74cecf2..184e854 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java
@@ -200,6 +200,7 @@ public class TestIncreaseAllocationExpirer {
// back action to complete
Thread.sleep(10000);
// Verify container size is 1G
+ am1.allocate(null, null);
Assert.assertEquals(
1 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
.getAllocatedResource().getMemorySize());
@@ -304,6 +305,8 @@ public class TestIncreaseAllocationExpirer {
// Wait long enough for the second token (5G) to expire, and verify that
// the roll back action is completed as expected
Thread.sleep(10000);
+ am1.allocate(null, null);
+ Thread.sleep(2000);
// Verify container size is rolled back to 3G
Assert.assertEquals(
3 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
@@ -400,13 +403,13 @@ public class TestIncreaseAllocationExpirer {
// Decrease containers
List<UpdateContainerRequest> decreaseRequests = new ArrayList<>();
decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId2,
- ContainerUpdateType.INCREASE_RESOURCE,
+ ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(2 * GB), null));
decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId3,
- ContainerUpdateType.INCREASE_RESOURCE,
+ ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(4 * GB), null));
decreaseRequests.add(UpdateContainerRequest.newInstance(1, containerId4,
- ContainerUpdateType.INCREASE_RESOURCE,
+ ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(4 * GB), null));
AllocateResponse response =
am1.sendContainerResizingRequest(decreaseRequests);
@@ -418,6 +421,9 @@ public class TestIncreaseAllocationExpirer {
rm1, containerId4, Resources.createResource(6 * GB)));
// Wait for containerId3 token to expire,
Thread.sleep(10000);
+
+ am1.allocate(null, null);
+
Assert.assertEquals(
2 * GB, rm1.getResourceScheduler().getRMContainer(containerId2)
.getAllocatedResource().getMemorySize());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index ec1b84d..3fbbae3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -1061,11 +1061,11 @@ public class TestLeafQueue {
qb.releaseResource(clusterResource, app_0,
app_0.getAppSchedulingInfo().getPendingAsk(u0SchedKey)
.getPerAllocationResource(),
- null, null, false);
+ null, null);
qb.releaseResource(clusterResource, app_2,
app_2.getAppSchedulingInfo().getPendingAsk(u1SchedKey)
.getPerAllocationResource(),
- null, null, false);
+ null, null);
qb.setUserLimit(50);
qb.setUserLimitFactor(1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 11fea82..c4b7a0d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -171,11 +171,11 @@ public class TestParentQueue {
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource,
- allocatedResource, RMNodeLabelsManager.NO_LABEL, false);
+ allocatedResource, RMNodeLabelsManager.NO_LABEL);
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
- allocatedResource, null, null, false);
+ allocatedResource, null, null);
}
// Next call - nothing
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-6216. Unify Container Resizing code
paths with Container Updates making it scheduler agnostic. (Arun Suresh via
wangda)
Posted by wa...@apache.org.
YARN-6216. Unify Container Resizing code paths with Container Updates making it scheduler agnostic. (Arun Suresh via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eac6b4c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eac6b4c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eac6b4c3
Branch: refs/heads/trunk
Commit: eac6b4c35c50e555c2f1b5f913bb2c4d839f1ff4
Parents: 480b4dd
Author: Wangda Tan <wa...@apache.org>
Authored: Tue Feb 28 10:35:50 2017 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue Feb 28 10:35:50 2017 -0800
----------------------------------------------------------------------
.../sls/scheduler/ResourceSchedulerWrapper.java | 8 -
.../server/scheduler/SchedulerRequestKey.java | 12 +-
.../server/resourcemanager/RMServerUtils.java | 27 +-
.../rmcontainer/RMContainer.java | 4 -
.../RMContainerChangeResourceEvent.java | 44 ---
.../rmcontainer/RMContainerImpl.java | 46 ---
.../scheduler/AbstractYarnScheduler.java | 171 +++++++---
.../scheduler/AppSchedulingInfo.java | 283 +---------------
.../scheduler/ContainerUpdateContext.java | 193 ++++++++---
.../scheduler/SchedulerApplicationAttempt.java | 212 ++++--------
.../scheduler/SchedulerNode.java | 44 ---
.../scheduler/capacity/AbstractCSQueue.java | 13 +-
.../scheduler/capacity/CSQueue.java | 15 -
.../scheduler/capacity/CapacityScheduler.java | 121 +------
.../scheduler/capacity/LeafQueue.java | 152 +--------
.../scheduler/capacity/ParentQueue.java | 53 +--
.../capacity/allocator/ContainerAllocator.java | 31 +-
.../allocator/IncreaseContainerAllocator.java | 337 -------------------
.../common/ContainerAllocationProposal.java | 9 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 245 +++-----------
.../common/fica/FiCaSchedulerNode.java | 14 -
.../scheduler/fair/FairScheduler.java | 11 +-
.../scheduler/fifo/FifoScheduler.java | 8 -
.../scheduler/capacity/TestChildQueueOrder.java | 4 +-
.../capacity/TestContainerResizing.java | 134 +-------
.../capacity/TestIncreaseAllocationExpirer.java | 12 +-
.../scheduler/capacity/TestLeafQueue.java | 4 +-
.../scheduler/capacity/TestParentQueue.java | 4 +-
28 files changed, 482 insertions(+), 1729 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 5517362..df8323a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -969,12 +969,4 @@ final public class ResourceSchedulerWrapper
return Priority.newInstance(0);
}
- @Override
- protected void decreaseContainer(
- SchedContainerChangeRequest decreaseRequest,
- SchedulerApplicationAttempt attempt) {
- // TODO Auto-generated method stub
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
index 02539ba..c4f37f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -116,7 +116,17 @@ public final class SchedulerRequestKey implements
if (priorityCompare != 0) {
return priorityCompare;
}
- return Long.compare(allocationRequestId, o.getAllocationRequestId());
+ int allocReqCompare = Long.compare(
+ allocationRequestId, o.getAllocationRequestId());
+
+ if (allocReqCompare != 0) {
+ return allocReqCompare;
+ }
+
+ if (this.containerToUpdate != null && o.containerToUpdate != null) {
+ return (this.containerToUpdate.compareTo(o.containerToUpdate));
+ }
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index e98141b..0aa7a2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -152,26 +152,16 @@ public class RMServerUtils {
if (msg == null) {
if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) &&
(updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) {
- Resource original = rmContainer.getContainer().getResource();
- Resource target = updateReq.getCapability();
- if (Resources.fitsIn(target, original)) {
- // This is a decrease request
- if (validateIncreaseDecreaseRequest(rmContext, updateReq,
- maximumAllocation, false)) {
- updateRequests.getDecreaseRequests().add(updateReq);
- outstandingUpdate.add(updateReq.getContainerId());
- } else {
- msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
- }
- } else {
- // This is an increase request
- if (validateIncreaseDecreaseRequest(rmContext, updateReq,
- maximumAllocation, true)) {
+ if (validateIncreaseDecreaseRequest(
+ rmContext, updateReq, maximumAllocation)) {
+ if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
updateRequests.getIncreaseRequests().add(updateReq);
- outstandingUpdate.add(updateReq.getContainerId());
} else {
- msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+ updateRequests.getDecreaseRequests().add(updateReq);
}
+ outstandingUpdate.add(updateReq.getContainerId());
+ } else {
+ msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
}
} else {
ExecutionType original = rmContainer.getExecutionType();
@@ -329,8 +319,7 @@ public class RMServerUtils {
// Sanity check and normalize target resource
private static boolean validateIncreaseDecreaseRequest(RMContext rmContext,
- UpdateContainerRequest request, Resource maximumAllocation,
- boolean increase) {
+ UpdateContainerRequest request, Resource maximumAllocation) {
if (request.getCapability().getMemorySize() < 0
|| request.getCapability().getMemorySize() > maximumAllocation
.getMemorySize()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index 020764b..7ad381e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -91,10 +91,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
String getNodeHttpAddress();
String getNodeLabelExpression();
-
- boolean hasIncreaseReservation();
-
- void cancelIncreaseReservation();
String getQueueName();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
deleted file mode 100644
index 920cfdb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public class RMContainerChangeResourceEvent extends RMContainerEvent {
-
- final Resource targetResource;
- final boolean increase;
-
- public RMContainerChangeResourceEvent(ContainerId containerId,
- Resource targetResource, boolean increase) {
- super(containerId, RMContainerEventType.CHANGE_RESOURCE);
-
- this.targetResource = targetResource;
- this.increase = increase;
- }
-
- public Resource getTargetResource() {
- return targetResource;
- }
-
- public boolean isIncrease() {
- return increase;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 72ce1a0..12fbbea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -131,8 +131,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
- RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
- .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.ACQUIRE_UPDATED_CONTAINER,
new ContainerAcquiredWhileRunningTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
@@ -183,7 +181,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
private boolean isAMContainer;
private List<ResourceRequest> resourceRequests;
- private volatile boolean hasIncreaseReservation = false;
// Only used for container resource increase and decrease. This is the
// resource to rollback to should container resource increase token expires.
private Resource lastConfirmedResource;
@@ -561,12 +558,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
if (c != null) {
c.setNodeId(container.reservedNode);
}
-
- if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED)
- .contains(container.getState())) {
- // When container's state != NEW/RESERVED, it is an increase reservation
- container.hasIncreaseReservation = true;
- }
}
}
@@ -681,33 +672,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
}
}
}
-
- private static final class ChangeResourceTransition extends BaseTransition {
-
- @Override
- public void transition(RMContainerImpl container, RMContainerEvent event) {
- RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event;
-
- Resource targetResource = changeEvent.getTargetResource();
- Resource lastConfirmedResource = container.lastConfirmedResource;
-
- if (!changeEvent.isIncrease()) {
- // Only unregister from the containerAllocationExpirer when target
- // resource is less than or equal to the last confirmed resource.
- if (Resources.fitsIn(targetResource, lastConfirmedResource)) {
- container.lastConfirmedResource = targetResource;
- container.containerAllocationExpirer.unregister(
- new AllocationExpirationInfo(event.getContainerId()));
- }
- }
-
- container.container.setResource(targetResource);
-
- // We reach here means we either allocated increase reservation OR
- // decreased container, reservation will be cancelled anyway.
- container.hasIncreaseReservation = false;
- }
- }
private static class FinishedTransition extends BaseTransition {
@@ -857,16 +821,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
return -1;
}
- @Override
- public boolean hasIncreaseReservation() {
- return hasIncreaseReservation;
- }
-
- @Override
- public void cancelIncreaseReservation() {
- hasIncreaseReservation = false;
- }
-
public void setQueueName(String queueName) {
this.queueName = queueName;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index ce6d2a2..213839d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -85,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -597,6 +600,8 @@ public abstract class AbstractYarnScheduler
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
completedContainerInternal(rmContainer, containerStatus, event);
+ completeOustandingUpdatesWhichAreReserved(
+ rmContainer, containerStatus, event);
} else {
ContainerId containerId = rmContainer.getContainerId();
// Inform the container
@@ -622,6 +627,33 @@ public abstract class AbstractYarnScheduler
recoverResourceRequestForContainer(rmContainer);
}
+ // Optimization:
+ // Check if there are in-flight container updates and complete the
+ // associated temp containers. These are removed when the app completes,
+ // but removing them when the actual container completes would allow the
+ // scheduler to reallocate those resources sooner.
+ private void completeOustandingUpdatesWhichAreReserved(
+ RMContainer rmContainer, ContainerStatus containerStatus,
+ RMContainerEventType event) {
+ N schedulerNode = getSchedulerNode(rmContainer.getNodeId());
+ if (schedulerNode != null &&
+ schedulerNode.getReservedContainer() != null) {
+ RMContainer resContainer = schedulerNode.getReservedContainer();
+ if (resContainer.getReservedSchedulerKey() != null) {
+ ContainerId containerToUpdate = resContainer
+ .getReservedSchedulerKey().getContainerToUpdate();
+ if (containerToUpdate != null &&
+ containerToUpdate.equals(containerStatus.getContainerId())) {
+ completedContainerInternal(resContainer,
+ ContainerStatus.newInstance(resContainer.getContainerId(),
+ containerStatus.getState(), containerStatus
+ .getDiagnostics(),
+ containerStatus.getExitStatus()), event);
+ }
+ }
+ }
+ }
+
// clean up a completed container
protected abstract void completedContainerInternal(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event);
@@ -650,28 +682,6 @@ public abstract class AbstractYarnScheduler
}
}
- protected void decreaseContainers(
- List<UpdateContainerRequest> decreaseRequests,
- SchedulerApplicationAttempt attempt) {
- if (null == decreaseRequests || decreaseRequests.isEmpty()) {
- return;
- }
- // Pre-process decrease requests
- List<SchedContainerChangeRequest> schedDecreaseRequests =
- createSchedContainerChangeRequests(decreaseRequests, false);
- for (SchedContainerChangeRequest request : schedDecreaseRequests) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing decrease request:" + request);
- }
- // handle decrease request
- decreaseContainer(request, attempt);
- }
- }
-
- protected abstract void decreaseContainer(
- SchedContainerChangeRequest decreaseRequest,
- SchedulerApplicationAttempt attempt);
-
@Override
public N getSchedulerNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId);
@@ -1074,21 +1084,39 @@ public abstract class AbstractYarnScheduler
}
}
- protected void handleExecutionTypeUpdates(
- SchedulerApplicationAttempt appAttempt,
- List<UpdateContainerRequest> promotionRequests,
- List<UpdateContainerRequest> demotionRequests) {
+ protected void handleContainerUpdates(
+ SchedulerApplicationAttempt appAttempt, ContainerUpdates updates) {
+ List<UpdateContainerRequest> promotionRequests =
+ updates.getPromotionRequests();
if (promotionRequests != null && !promotionRequests.isEmpty()) {
LOG.info("Promotion Update requests : " + promotionRequests);
- handlePromotionRequests(appAttempt, promotionRequests);
+ // Promotion is technically an increase request from
+ // 0 resources to target resources.
+ handleIncreaseRequests(appAttempt, promotionRequests);
}
+ List<UpdateContainerRequest> increaseRequests =
+ updates.getIncreaseRequests();
+ if (increaseRequests != null && !increaseRequests.isEmpty()) {
+ LOG.info("Resource increase requests : " + increaseRequests);
+ handleIncreaseRequests(appAttempt, increaseRequests);
+ }
+ List<UpdateContainerRequest> demotionRequests =
+ updates.getDemotionRequests();
if (demotionRequests != null && !demotionRequests.isEmpty()) {
LOG.info("Demotion Update requests : " + demotionRequests);
- handleDemotionRequests(appAttempt, demotionRequests);
+ // Demotion is technically a decrease request from initial
+ // to 0 resources
+ handleDecreaseRequests(appAttempt, demotionRequests);
+ }
+ List<UpdateContainerRequest> decreaseRequests =
+ updates.getDecreaseRequests();
+ if (decreaseRequests != null && !decreaseRequests.isEmpty()) {
+ LOG.info("Resource decrease requests : " + decreaseRequests);
+ handleDecreaseRequests(appAttempt, decreaseRequests);
}
}
- private void handlePromotionRequests(
+ private void handleIncreaseRequests(
SchedulerApplicationAttempt applicationAttempt,
List<UpdateContainerRequest> updateContainerRequests) {
for (UpdateContainerRequest uReq : updateContainerRequests) {
@@ -1118,7 +1146,7 @@ public abstract class AbstractYarnScheduler
}
}
- private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt,
+ private void handleDecreaseRequests(SchedulerApplicationAttempt appAttempt,
List<UpdateContainerRequest> demotionRequests) {
OpportunisticContainerContext oppCntxt =
appAttempt.getOpportunisticContainerContext();
@@ -1126,24 +1154,59 @@ public abstract class AbstractYarnScheduler
RMContainer rmContainer =
rmContext.getScheduler().getRMContainer(uReq.getContainerId());
if (rmContainer != null) {
- if (appAttempt.getUpdateContext().checkAndAddToOutstandingDecreases(
- rmContainer.getContainer())) {
- RMContainer demotedRMContainer =
- createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
- appAttempt.addToNewlyDemotedContainers(
- uReq.getContainerId(), demotedRMContainer);
+ SchedulerNode schedulerNode = rmContext.getScheduler()
+ .getSchedulerNode(rmContainer.getContainer().getNodeId());
+ if (appAttempt.getUpdateContext()
+ .checkAndAddToOutstandingDecreases(uReq, schedulerNode,
+ rmContainer.getContainer())) {
+ if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE ==
+ uReq.getContainerUpdateType()) {
+ RMContainer demotedRMContainer =
+ createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
+ appAttempt.addToNewlyDemotedContainers(
+ uReq.getContainerId(), demotedRMContainer);
+ } else {
+ RMContainer demotedRMContainer = createDecreasedRMContainer(
+ appAttempt, uReq, rmContainer);
+ appAttempt.addToNewlyDecreasedContainers(
+ uReq.getContainerId(), demotedRMContainer);
+ }
} else {
appAttempt.addToUpdateContainerErrors(
UpdateContainerError.newInstance(
RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
}
} else {
- LOG.warn("Cannot demote non-existent (or completed) Container ["
- + uReq.getContainerId() + "]");
+ LOG.warn("Cannot demote/decrease non-existent (or completed) " +
+ "Container [" + uReq.getContainerId() + "]");
}
}
}
+ private RMContainer createDecreasedRMContainer(
+ SchedulerApplicationAttempt appAttempt, UpdateContainerRequest uReq,
+ RMContainer rmContainer) {
+ SchedulerRequestKey sk =
+ SchedulerRequestKey.extractFrom(rmContainer.getContainer());
+ Container decreasedContainer = BuilderUtils.newContainer(
+ ContainerId.newContainerId(appAttempt.getApplicationAttemptId(),
+ appAttempt.getNewContainerId()),
+ rmContainer.getContainer().getNodeId(),
+ rmContainer.getContainer().getNodeHttpAddress(),
+ Resources.none(),
+ sk.getPriority(), null, rmContainer.getExecutionType(),
+ sk.getAllocationRequestId());
+ decreasedContainer.setVersion(rmContainer.getContainer().getVersion());
+ RMContainer newRmContainer = new RMContainerImpl(decreasedContainer,
+ sk, appAttempt.getApplicationAttemptId(),
+ decreasedContainer.getNodeId(), appAttempt.getUser(), rmContext,
+ rmContainer.isRemotelyAllocated());
+ appAttempt.addRMContainer(decreasedContainer.getId(), rmContainer);
+ ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
+ decreasedContainer.getNodeId()).allocateContainer(newRmContainer);
+ return newRmContainer;
+ }
+
private RMContainer createDemotedRMContainer(
SchedulerApplicationAttempt appAttempt,
OpportunisticContainerContext oppCntxt,
@@ -1162,4 +1225,36 @@ public abstract class AbstractYarnScheduler
return SchedulerUtils.createOpportunisticRmContainer(
rmContext, demotedContainer, false);
}
+
+ /**
+ * Rollback container update after expiry.
+ * @param containerId ContainerId.
+ */
+ protected void rollbackContainerUpdate(
+ ContainerId containerId) {
+ RMContainer rmContainer = getRMContainer(containerId);
+ if (rmContainer == null) {
+ LOG.info("Cannot rollback resource for container " + containerId
+ + ". The container does not exist.");
+ return;
+ }
+ T app = getCurrentAttemptForContainer(containerId);
+ if (getCurrentAttemptForContainer(containerId) == null) {
+ LOG.info("Cannot rollback resource for container " + containerId
+ + ". The application that the container "
+ + "belongs to does not exist.");
+ return;
+ }
+
+ if (Resources.fitsIn(rmContainer.getLastConfirmedResource(),
+ rmContainer.getContainer().getResource())) {
+ LOG.info("Roll back resource for container " + containerId);
+ handleDecreaseRequests(app, Arrays.asList(
+ UpdateContainerRequest.newInstance(
+ rmContainer.getContainer().getVersion(),
+ rmContainer.getContainerId(),
+ ContainerUpdateType.DECREASE_RESOURCE,
+ rmContainer.getLastConfirmedResource(), null)));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 48ecd2e..bff9c41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -90,9 +90,7 @@ public class AppSchedulingInfo {
schedulerKeys = new ConcurrentSkipListMap<>();
final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
- final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
- SchedContainerChangeRequest>>> containerIncreaseRequestMap =
- new ConcurrentHashMap<>();
+
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
@@ -158,137 +156,6 @@ public class AppSchedulingInfo {
LOG.info("Application " + applicationId + " requests cleared");
}
- public boolean hasIncreaseRequest(NodeId nodeId) {
- try {
- this.readLock.lock();
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- return requestsOnNode == null ? false : requestsOnNode.size() > 0;
- } finally {
- this.readLock.unlock();
- }
- }
-
- public Map<ContainerId, SchedContainerChangeRequest>
- getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) {
- try {
- this.readLock.lock();
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- return requestsOnNode == null ? null : requestsOnNode.get(
- schedulerKey);
- } finally {
- this.readLock.unlock();
- }
- }
-
- /**
- * return true if any of the existing increase requests are updated,
- * false if none of them are updated
- */
- public boolean updateIncreaseRequests(
- List<SchedContainerChangeRequest> increaseRequests) {
- boolean resourceUpdated = false;
-
- try {
- this.writeLock.lock();
- for (SchedContainerChangeRequest r : increaseRequests) {
- if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
- LOG.warn("rmContainer's state is not RUNNING, for increase request"
- + " with container-id=" + r.getContainerId());
- continue;
- }
- try {
- RMServerUtils.checkSchedContainerChangeRequest(r, true);
- } catch (YarnException e) {
- LOG.warn("Error happens when checking increase request, Ignoring.."
- + " exception=", e);
- continue;
- }
- NodeId nodeId = r.getRMContainer().getAllocatedNode();
-
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- requestsOnNode = new TreeMap<>();
- containerIncreaseRequestMap.put(nodeId, requestsOnNode);
- }
-
- SchedContainerChangeRequest prevChangeRequest =
- getIncreaseRequest(nodeId,
- r.getRMContainer().getAllocatedSchedulerKey(),
- r.getContainerId());
- if (null != prevChangeRequest) {
- if (Resources.equals(prevChangeRequest.getTargetCapacity(),
- r.getTargetCapacity())) {
- // increase request hasn't changed
- continue;
- }
-
- // remove the old one, as we will use the new one going forward
- removeIncreaseRequest(nodeId,
- prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
- prevChangeRequest.getContainerId());
- }
-
- if (Resources.equals(r.getTargetCapacity(),
- r.getRMContainer().getAllocatedResource())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to increase container " + r.getContainerId()
- + ", target capacity = previous capacity = " + prevChangeRequest
- + ". Will ignore this increase request.");
- }
- continue;
- }
-
- // add the new one
- resourceUpdated = true;
- insertIncreaseRequest(r);
- }
- return resourceUpdated;
- } finally {
- this.writeLock.unlock();
- }
- }
-
- /**
- * Insert increase request, adding any missing items in the data-structure
- * hierarchy.
- */
- private void insertIncreaseRequest(SchedContainerChangeRequest request) {
- NodeId nodeId = request.getNodeId();
- SchedulerRequestKey schedulerKey =
- request.getRMContainer().getAllocatedSchedulerKey();
- ContainerId containerId = request.getContainerId();
-
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- requestsOnNode = new HashMap<>();
- containerIncreaseRequestMap.put(nodeId, requestsOnNode);
- }
-
- Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
- requestsOnNode.get(schedulerKey);
- if (null == requestsOnNodeWithPriority) {
- requestsOnNodeWithPriority = new TreeMap<>();
- requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
- incrementSchedulerKeyReference(schedulerKey);
- }
-
- requestsOnNodeWithPriority.put(containerId, request);
-
- // update resources
- String partition = request.getRMContainer().getNodeLabelExpression();
- Resource delta = request.getDeltaCapacity();
- appResourceUsage.incPending(partition, delta);
- queue.incPendingResource(partition, delta);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added increase request:" + request.getContainerId()
- + " delta=" + delta);
- }
- }
private void incrementSchedulerKeyReference(
SchedulerRequestKey schedulerKey) {
@@ -312,73 +179,6 @@ public class AppSchedulingInfo {
}
}
- public boolean removeIncreaseRequest(NodeId nodeId,
- SchedulerRequestKey schedulerKey, ContainerId containerId) {
- try {
- this.writeLock.lock();
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- return false;
- }
-
- Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
- requestsOnNode.get(schedulerKey);
- if (null == requestsOnNodeWithPriority) {
- return false;
- }
-
- SchedContainerChangeRequest request =
- requestsOnNodeWithPriority.remove(containerId);
-
- // remove hierarchies if it becomes empty
- if (requestsOnNodeWithPriority.isEmpty()) {
- requestsOnNode.remove(schedulerKey);
- decrementSchedulerKeyReference(schedulerKey);
- }
- if (requestsOnNode.isEmpty()) {
- containerIncreaseRequestMap.remove(nodeId);
- }
-
- if (request == null) {
- return false;
- }
-
- // update queue's pending resource if request exists
- String partition = request.getRMContainer().getNodeLabelExpression();
- Resource delta = request.getDeltaCapacity();
- appResourceUsage.decPending(partition, delta);
- queue.decPendingResource(partition, delta);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("remove increase request:" + request);
- }
-
- return true;
- } finally {
- this.writeLock.unlock();
- }
- }
-
- public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
- SchedulerRequestKey schedulerKey, ContainerId containerId) {
- try {
- this.readLock.lock();
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- return null;
- }
-
- Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
- requestsOnNode.get(schedulerKey);
- return requestsOnNodeWithPriority == null ? null
- : requestsOnNodeWithPriority.get(containerId);
- } finally {
- this.readLock.unlock();
- }
- }
-
public ContainerUpdateContext getUpdateContext() {
return updateContext;
}
@@ -514,21 +314,6 @@ public class AppSchedulingInfo {
appResourceUsage.decPending(partition, toDecrease);
}
- private boolean hasRequestLabelChanged(ResourceRequest requestOne,
- ResourceRequest requestTwo) {
- String requestOneLabelExp = requestOne.getNodeLabelExpression();
- String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
- // First request label expression can be null and second request
- // is not null then we have to consider it as changed.
- if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
- return true;
- }
- // If the label is not matching between both request when
- // requestOneLabelExp is not null.
- return ((null != requestOneLabelExp) && !(requestOneLabelExp
- .equals(requestTwoLabelExp)));
- }
-
/**
* The ApplicationMaster is updating the placesBlacklistedByApp used for
* containers other than AMs.
@@ -601,22 +386,6 @@ public class AppSchedulingInfo {
return ret;
}
- public SchedulingPlacementSet getFirstSchedulingPlacementSet() {
- try {
- readLock.lock();
- for (SchedulerRequestKey key : schedulerKeys.keySet()) {
- SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(key);
- if (null != ps) {
- return ps;
- }
- }
- return null;
- } finally {
- readLock.unlock();
- }
-
- }
-
public PendingAsk getNextPendingAsk() {
try {
readLock.lock();
@@ -666,56 +435,6 @@ public class AppSchedulingInfo {
}
}
- public void increaseContainer(SchedContainerChangeRequest increaseRequest) {
- NodeId nodeId = increaseRequest.getNodeId();
- SchedulerRequestKey schedulerKey =
- increaseRequest.getRMContainer().getAllocatedSchedulerKey();
- ContainerId containerId = increaseRequest.getContainerId();
- Resource deltaCapacity = increaseRequest.getDeltaCapacity();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("allocated increase request : applicationId=" + applicationId
- + " container=" + containerId + " host="
- + increaseRequest.getNodeId() + " user=" + user + " resource="
- + deltaCapacity);
- }
- try {
- this.writeLock.lock();
- // Set queue metrics
- queue.getMetrics().allocateResources(user, deltaCapacity);
- // remove the increase request from pending increase request map
- removeIncreaseRequest(nodeId, schedulerKey, containerId);
- // update usage
- appResourceUsage.incUsed(increaseRequest.getNodePartition(),
- deltaCapacity);
- } finally {
- this.writeLock.unlock();
- }
- }
-
- public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) {
- // Delta is negative when it's a decrease request
- Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decrease container : applicationId=" + applicationId
- + " container=" + decreaseRequest.getContainerId() + " host="
- + decreaseRequest.getNodeId() + " user=" + user + " resource="
- + absDelta);
- }
-
- try {
- this.writeLock.lock();
- // Set queue metrics
- queue.getMetrics().releaseResources(user, absDelta);
-
- // update usage
- appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
- } finally {
- this.writeLock.unlock();
- }
- }
-
public List<ResourceRequest> allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey,
Container containerAllocated) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
index 7381250..5ac2ac5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -28,17 +28,19 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
+ .RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+ .SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.resource.Resources;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,43 +60,37 @@ public class ContainerUpdateContext {
private final Map<SchedulerRequestKey, Map<Resource,
Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>();
- private final Set<ContainerId> outstandingDecreases = new HashSet<>();
+ private final Map<ContainerId, Resource> outstandingDecreases =
+ new HashMap<>();
private final AppSchedulingInfo appSchedulingInfo;
ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
this.appSchedulingInfo = appSchedulingInfo;
}
- private synchronized boolean isBeingIncreased(Container container) {
- Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
- outstandingIncreases.get(
- new SchedulerRequestKey(container.getPriority(),
- container.getAllocationRequestId(), container.getId()));
- if (resourceMap != null) {
- Map<NodeId, Set<ContainerId>> locationMap =
- resourceMap.get(container.getResource());
- if (locationMap != null) {
- Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
- if (containerIds != null && !containerIds.isEmpty()) {
- return containerIds.contains(container.getId());
- }
- }
- }
- return false;
- }
-
/**
* Add the container to outstanding decreases.
+ * @param updateReq UpdateContainerRequest.
+ * @param schedulerNode SchedulerNode.
* @param container Container.
- * @return true if updated to outstanding decreases was successful.
+ * @return If it was possible to decrease the container.
*/
public synchronized boolean checkAndAddToOutstandingDecreases(
+ UpdateContainerRequest updateReq, SchedulerNode schedulerNode,
Container container) {
- if (isBeingIncreased(container)
- || outstandingDecreases.contains(container.getId())) {
+ if (outstandingDecreases.containsKey(container.getId())) {
return false;
}
- outstandingDecreases.add(container.getId());
+ if (ContainerUpdateType.DECREASE_RESOURCE ==
+ updateReq.getContainerUpdateType()) {
+ SchedulerRequestKey updateKey = new SchedulerRequestKey
+ (container.getPriority(),
+ container.getAllocationRequestId(), container.getId());
+ cancelPreviousRequest(schedulerNode, updateKey);
+ outstandingDecreases.put(container.getId(), updateReq.getCapability());
+ } else {
+ outstandingDecreases.put(container.getId(), container.getResource());
+ }
return true;
}
@@ -117,35 +113,63 @@ public class ContainerUpdateContext {
if (resourceMap == null) {
resourceMap = new HashMap<>();
outstandingIncreases.put(schedulerKey, resourceMap);
+ } else {
+ // Updating Resource for and existing increase container
+ if (ContainerUpdateType.INCREASE_RESOURCE ==
+ updateRequest.getContainerUpdateType()) {
+ cancelPreviousRequest(schedulerNode, schedulerKey);
+ } else {
+ return false;
+ }
}
+ Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
Map<NodeId, Set<ContainerId>> locationMap =
- resourceMap.get(container.getResource());
+ resourceMap.get(resToIncrease);
if (locationMap == null) {
locationMap = new HashMap<>();
- resourceMap.put(container.getResource(), locationMap);
+ resourceMap.put(resToIncrease, locationMap);
}
Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds == null) {
containerIds = new HashSet<>();
locationMap.put(container.getNodeId(), containerIds);
}
- if (containerIds.contains(container.getId())
- || outstandingDecreases.contains(container.getId())) {
+ if (outstandingDecreases.containsKey(container.getId())) {
return false;
}
- containerIds.add(container.getId());
- Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
- new HashMap<>();
- Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
- Map<String, ResourceRequest> resMap =
- createResourceRequests(rmContainer, schedulerNode,
- schedulerKey, resToIncrease);
- updateResReqs.put(schedulerKey, resMap);
- appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+ containerIds.add(container.getId());
+ if (!Resources.isNone(resToIncrease)) {
+ Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
+ new HashMap<>();
+ Map<String, ResourceRequest> resMap =
+ createResourceRequests(rmContainer, schedulerNode,
+ schedulerKey, resToIncrease);
+ updateResReqs.put(schedulerKey, resMap);
+ appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+ }
return true;
}
+ private void cancelPreviousRequest(SchedulerNode schedulerNode,
+ SchedulerRequestKey schedulerKey) {
+ SchedulingPlacementSet<SchedulerNode> schedulingPlacementSet =
+ appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
+ if (schedulingPlacementSet != null) {
+ Map<String, ResourceRequest> resourceRequests = schedulingPlacementSet
+ .getResourceRequests();
+ ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY);
+ // Decrement the pending using a dummy RR with
+ // resource = prev update req capability
+ if (prevReq != null) {
+ appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
+ schedulerKey, Container.newInstance(UNDEFINED,
+ schedulerNode.getNodeID(), "host:port",
+ prevReq.getCapability(), schedulerKey.getPriority(), null));
+ }
+ }
+ }
+
private Map<String, ResourceRequest> createResourceRequests(
RMContainer rmContainer, SchedulerNode schedulerNode,
SchedulerRequestKey schedulerKey, Resource resToIncrease) {
@@ -171,10 +195,16 @@ public class ContainerUpdateContext {
ContainerUpdateType.PROMOTE_EXECUTION_TYPE) {
return rmContainer.getContainer().getResource();
}
- // TODO: Fix this for container increase..
- // This has to equal the Resources in excess of fitsIn()
- // for container increase and is equal to the container total
- // resource for Promotion.
+ if (updateReq.getContainerUpdateType() ==
+ ContainerUpdateType.INCREASE_RESOURCE) {
+ // This has to equal the Resources in excess of fitsIn()
+ // for container increase and is equal to the container total
+ // resource for Promotion.
+ Resource maxCap = Resources.componentwiseMax(updateReq.getCapability(),
+ rmContainer.getContainer().getResource());
+ return Resources.add(maxCap,
+ Resources.negate(rmContainer.getContainer().getResource()));
+ }
return null;
}
@@ -228,6 +258,7 @@ public class ContainerUpdateContext {
/**
* Check if a new container is to be matched up against an outstanding
* Container increase request.
+ * @param node SchedulerNode.
* @param schedulerKey SchedulerRequestKey.
* @param rmContainer RMContainer.
* @return ContainerId.
@@ -264,4 +295,80 @@ public class ContainerUpdateContext {
}
return retVal;
}
+
+ /**
+ * Swaps the existing RMContainer's and the temp RMContainers internal
+ * container references after adjusting the resources in each.
+ * @param tempRMContainer Temp RMContainer.
+ * @param existingRMContainer Existing RMContainer.
+ * @param updateType Update Type.
+ * @return Existing RMContainer after swapping the container references.
+ */
+ public RMContainer swapContainer(RMContainer tempRMContainer,
+ RMContainer existingRMContainer, ContainerUpdateType updateType) {
+ ContainerId matchedContainerId = existingRMContainer.getContainerId();
+ // Swap updated container with the existing container
+ Container tempContainer = tempRMContainer.getContainer();
+
+ Resource updatedResource = createUpdatedResource(
+ tempContainer, existingRMContainer.getContainer(), updateType);
+ Resource resourceToRelease = createResourceToRelease(
+ existingRMContainer.getContainer(), updateType);
+ Container newContainer = Container.newInstance(matchedContainerId,
+ existingRMContainer.getContainer().getNodeId(),
+ existingRMContainer.getContainer().getNodeHttpAddress(),
+ updatedResource,
+ existingRMContainer.getContainer().getPriority(), null,
+ tempContainer.getExecutionType());
+ newContainer.setAllocationRequestId(
+ existingRMContainer.getContainer().getAllocationRequestId());
+ newContainer.setVersion(existingRMContainer.getContainer().getVersion());
+
+ tempRMContainer.getContainer().setResource(resourceToRelease);
+ tempRMContainer.getContainer().setExecutionType(
+ existingRMContainer.getContainer().getExecutionType());
+
+ ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
+ return existingRMContainer;
+ }
+
+ /**
+ * Returns the resource that the container will finally be assigned with
+ * at the end of the update operation.
+ * @param tempContainer Temporary Container created for the operation.
+ * @param existingContainer Existing Container.
+ * @param updateType Update Type.
+ * @return Final Resource.
+ */
+ private Resource createUpdatedResource(Container tempContainer,
+ Container existingContainer, ContainerUpdateType updateType) {
+ if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
+ return Resources.add(existingContainer.getResource(),
+ tempContainer.getResource());
+ } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+ return outstandingDecreases.get(existingContainer.getId());
+ } else {
+ return existingContainer.getResource();
+ }
+ }
+
+ /**
+ * Returns the resources that need to be released at the end of the update
+ * operation.
+ * @param existingContainer Existing Container.
+ * @param updateType Updated type.
+ * @return Resources to be released.
+ */
+ private Resource createResourceToRelease(Container existingContainer,
+ ContainerUpdateType updateType) {
+ if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
+ return Resources.none();
+ } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType){
+ return Resources.add(existingContainer.getResource(),
+ Resources.negate(
+ outstandingDecreases.get(existingContainer.getId())));
+ } else {
+ return existingContainer.getResource();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 0e79838..f894a40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppR
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerChangeResourceEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@@ -73,6 +73,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode
+ .RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@@ -136,9 +139,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>();
+ protected List<RMContainer> tempContainerToKill = new ArrayList<>();
protected Map<ContainerId, RMContainer> newlyPromotedContainers = new HashMap<>();
protected Map<ContainerId, RMContainer> newlyDemotedContainers = new HashMap<>();
- protected List<RMContainer> tempContainerToKill = new ArrayList<>();
protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>();
protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>();
protected Set<NMToken> updatedNMTokens = new HashSet<>();
@@ -670,6 +673,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
rmContainer.getContainerId(),
ContainerUpdateType.INCREASE_RESOURCE == updateType));
+ if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(),
+ Collections.singletonList(rmContainer.getContainer())));
+ }
}
return container;
}
@@ -717,11 +725,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
}
- public void addToNewlyDemotedContainers(ContainerId containerId,
+ public synchronized void addToNewlyDemotedContainers(ContainerId containerId,
RMContainer rmContainer) {
newlyDemotedContainers.put(containerId, rmContainer);
}
+ public synchronized void addToNewlyDecreasedContainers(
+ ContainerId containerId, RMContainer rmContainer) {
+ newlyDecreasedContainers.put(containerId, rmContainer);
+ }
+
protected synchronized void addToUpdateContainerErrors(
UpdateContainerError error) {
updateContainerErrors.add(error);
@@ -729,10 +742,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
protected synchronized void addToNewlyAllocatedContainers(
SchedulerNode node, RMContainer rmContainer) {
- if (oppContainerContext == null) {
- newlyAllocatedContainers.add(rmContainer);
- return;
- }
ContainerId matchedContainerId =
getUpdateContext().matchContainerToOutstandingIncreaseReq(
node, rmContainer.getAllocatedSchedulerKey(), rmContainer);
@@ -745,7 +754,21 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// occurs when using MiniYARNCluster to test).
tempContainerToKill.add(rmContainer);
} else {
- newlyPromotedContainers.put(matchedContainerId, rmContainer);
+ RMContainer existingContainer = getRMContainer(matchedContainerId);
+ // If this container was already GUARANTEED, then it is an
+ // increase, else its a promotion
+ if (existingContainer == null ||
+ EnumSet.of(RMContainerState.COMPLETED, RMContainerState.KILLED,
+ RMContainerState.EXPIRED, RMContainerState.RELEASED).contains(
+ existingContainer.getState())) {
+ tempContainerToKill.add(rmContainer);
+ } else {
+ if (ExecutionType.GUARANTEED == existingContainer.getExecutionType()) {
+ newlyIncreasedContainers.put(matchedContainerId, rmContainer);
+ } else {
+ newlyPromotedContainers.put(matchedContainerId, rmContainer);
+ }
+ }
}
} else {
newlyAllocatedContainers.add(rmContainer);
@@ -753,15 +776,25 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public List<Container> pullNewlyPromotedContainers() {
- return pullContainersWithUpdatedExecType(newlyPromotedContainers,
+ return pullNewlyUpdatedContainers(newlyPromotedContainers,
ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
}
public List<Container> pullNewlyDemotedContainers() {
- return pullContainersWithUpdatedExecType(newlyDemotedContainers,
+ return pullNewlyUpdatedContainers(newlyDemotedContainers,
ContainerUpdateType.DEMOTE_EXECUTION_TYPE);
}
+ public List<Container> pullNewlyIncreasedContainers() {
+ return pullNewlyUpdatedContainers(newlyIncreasedContainers,
+ ContainerUpdateType.INCREASE_RESOURCE);
+ }
+
+ public List<Container> pullNewlyDecreasedContainers() {
+ return pullNewlyUpdatedContainers(newlyDecreasedContainers,
+ ContainerUpdateType.DECREASE_RESOURCE);
+ }
+
public List<UpdateContainerError> pullUpdateContainerErrors() {
List<UpdateContainerError> errors =
new ArrayList<>(updateContainerErrors);
@@ -775,11 +808,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* GUARANTEED to OPPORTUNISTIC.
* @return Newly Promoted and Demoted containers
*/
- private List<Container> pullContainersWithUpdatedExecType(
+ private List<Container> pullNewlyUpdatedContainers(
Map<ContainerId, RMContainer> newlyUpdatedContainers,
ContainerUpdateType updateTpe) {
List<Container> updatedContainers = new ArrayList<>();
- if (oppContainerContext == null) {
+ if (oppContainerContext == null &&
+ (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateTpe
+ || ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateTpe)) {
return updatedContainers;
}
try {
@@ -789,19 +824,22 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
while (i.hasNext()) {
Map.Entry<ContainerId, RMContainer> entry = i.next();
ContainerId matchedContainerId = entry.getKey();
- RMContainer rmContainer = entry.getValue();
-
- // swap containers
- RMContainer existingRMContainer = swapContainer(
- rmContainer, matchedContainerId);
- getUpdateContext().removeFromOutstandingUpdate(
- rmContainer.getAllocatedSchedulerKey(),
- existingRMContainer.getContainer());
- Container updatedContainer = updateContainerAndNMToken(
- existingRMContainer, updateTpe);
- updatedContainers.add(updatedContainer);
-
- tempContainerToKill.add(rmContainer);
+ RMContainer tempRMContainer = entry.getValue();
+
+ RMContainer existingRMContainer =
+ getRMContainer(matchedContainerId);
+ if (existingRMContainer != null) {
+ // swap containers
+ existingRMContainer = getUpdateContext().swapContainer(
+ tempRMContainer, existingRMContainer, updateTpe);
+ getUpdateContext().removeFromOutstandingUpdate(
+ tempRMContainer.getAllocatedSchedulerKey(),
+ existingRMContainer.getContainer());
+ Container updatedContainer = updateContainerAndNMToken(
+ existingRMContainer, updateTpe);
+ updatedContainers.add(updatedContainer);
+ }
+ tempContainerToKill.add(tempRMContainer);
i.remove();
}
// Release all temporary containers
@@ -823,68 +861,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
}
- private RMContainer swapContainer(RMContainer rmContainer, ContainerId
- matchedContainerId) {
- RMContainer existingRMContainer =
- getRMContainer(matchedContainerId);
- if (existingRMContainer != null) {
- // Swap updated container with the existing container
- Container updatedContainer = rmContainer.getContainer();
-
- Container newContainer = Container.newInstance(matchedContainerId,
- existingRMContainer.getContainer().getNodeId(),
- existingRMContainer.getContainer().getNodeHttpAddress(),
- updatedContainer.getResource(),
- existingRMContainer.getContainer().getPriority(), null,
- updatedContainer.getExecutionType());
- newContainer.setAllocationRequestId(
- existingRMContainer.getContainer().getAllocationRequestId());
- newContainer.setVersion(existingRMContainer.getContainer().getVersion());
-
- rmContainer.getContainer().setResource(
- existingRMContainer.getContainer().getResource());
- rmContainer.getContainer().setExecutionType(
- existingRMContainer.getContainer().getExecutionType());
-
- ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
- }
- return existingRMContainer;
- }
-
- private List<Container> pullNewlyUpdatedContainers(
- Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
- try {
- writeLock.lock();
- List <Container> returnContainerList = new ArrayList <Container>(
- updatedContainerMap.size());
-
- Iterator<Entry<ContainerId, RMContainer>> i =
- updatedContainerMap.entrySet().iterator();
- while (i.hasNext()) {
- RMContainer rmContainer = i.next().getValue();
- Container updatedContainer = updateContainerAndNMToken(rmContainer,
- increase ? ContainerUpdateType.INCREASE_RESOURCE :
- ContainerUpdateType.DECREASE_RESOURCE);
- if (updatedContainer != null) {
- returnContainerList.add(updatedContainer);
- i.remove();
- }
- }
- return returnContainerList;
- } finally {
- writeLock.unlock();
- }
-
- }
-
- public List<Container> pullNewlyIncreasedContainers() {
- return pullNewlyUpdatedContainers(newlyIncreasedContainers, true);
- }
-
- public List<Container> pullNewlyDecreasedContainers() {
- return pullNewlyUpdatedContainers(newlyDecreasedContainers, false);
- }
-
public List<NMToken> pullUpdatedNMTokens() {
try {
writeLock.lock();
@@ -1252,68 +1228,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public ResourceUsage getSchedulingResourceUsage() {
return attemptResourceUsage;
}
-
- public boolean removeIncreaseRequest(NodeId nodeId,
- SchedulerRequestKey schedulerKey, ContainerId containerId) {
- try {
- writeLock.lock();
- return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey,
- containerId);
- } finally {
- writeLock.unlock();
- }
- }
-
- public boolean updateIncreaseRequests(
- List<SchedContainerChangeRequest> increaseRequests) {
- try {
- writeLock.lock();
- return appSchedulingInfo.updateIncreaseRequests(increaseRequests);
- } finally {
- writeLock.unlock();
- }
- }
-
- private void changeContainerResource(
- SchedContainerChangeRequest changeRequest, boolean increase) {
- try {
- writeLock.lock();
- if (increase) {
- appSchedulingInfo.increaseContainer(changeRequest);
- } else{
- appSchedulingInfo.decreaseContainer(changeRequest);
- }
-
- RMContainer changedRMContainer = changeRequest.getRMContainer();
- changedRMContainer.handle(
- new RMContainerChangeResourceEvent(changeRequest.getContainerId(),
- changeRequest.getTargetCapacity(), increase));
-
- // remove pending and not pulled by AM newly-increased or
- // decreased-containers and add the new one
- if (increase) {
- newlyDecreasedContainers.remove(changeRequest.getContainerId());
- newlyIncreasedContainers.put(changeRequest.getContainerId(),
- changedRMContainer);
- } else{
- newlyIncreasedContainers.remove(changeRequest.getContainerId());
- newlyDecreasedContainers.put(changeRequest.getContainerId(),
- changedRMContainer);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public void decreaseContainer(
- SchedContainerChangeRequest decreaseRequest) {
- changeContainerResource(decreaseRequest, false);
- }
-
- public void increaseContainer(
- SchedContainerChangeRequest increaseRequest) {
- changeContainerResource(increaseRequest, true);
- }
public void setAppAMNodePartitionName(String partitionName) {
this.appAMNodePartitionName = partitionName;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 9c2dff3..db17b42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -180,49 +180,6 @@ public abstract class SchedulerNode {
}
/**
- * Change the resources allocated for a container.
- * @param containerId Identifier of the container to change.
- * @param deltaResource Change in the resource allocation.
- * @param increase True if the change is an increase of allocation.
- */
- protected synchronized void changeContainerResource(ContainerId containerId,
- Resource deltaResource, boolean increase) {
- if (increase) {
- deductUnallocatedResource(deltaResource);
- } else {
- addUnallocatedResource(deltaResource);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug((increase ? "Increased" : "Decreased") + " container "
- + containerId + " of capacity " + deltaResource + " on host "
- + rmNode.getNodeAddress() + ", which has " + numContainers
- + " containers, " + getAllocatedResource() + " used and "
- + getUnallocatedResource() + " available after allocation");
- }
- }
-
- /**
- * Increase the resources allocated to a container.
- * @param containerId Identifier of the container to change.
- * @param deltaResource Increase of resource allocation.
- */
- public synchronized void increaseContainer(ContainerId containerId,
- Resource deltaResource) {
- changeContainerResource(containerId, deltaResource, true);
- }
-
- /**
- * Decrease the resources allocated to a container.
- * @param containerId Identifier of the container to change.
- * @param deltaResource Decrease of resource allocation.
- */
- public synchronized void decreaseContainer(ContainerId containerId,
- Resource deltaResource) {
- changeContainerResource(containerId, deltaResource, false);
- }
-
- /**
* Get unallocated resources on the node.
* @return Unallocated resources on the node
*/
@@ -280,7 +237,6 @@ public abstract class SchedulerNode {
if (info == null) {
return;
}
-
if (!releasedByNode && info.launchedOnNode) {
// wait until node reports container has completed
return;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index e9ef319..aa60c9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -453,14 +453,13 @@ public abstract class AbstractCSQueue implements CSQueue {
}
void allocateResource(Resource clusterResource,
- Resource resource, String nodePartition, boolean changeContainerResource) {
+ Resource resource, String nodePartition) {
try {
writeLock.lock();
queueUsage.incUsed(nodePartition, resource);
- if (!changeContainerResource) {
- ++numContainers;
- }
+ ++numContainers;
+
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, nodePartition);
} finally {
@@ -469,7 +468,7 @@ public abstract class AbstractCSQueue implements CSQueue {
}
protected void releaseResource(Resource clusterResource,
- Resource resource, String nodePartition, boolean changeContainerResource) {
+ Resource resource, String nodePartition) {
try {
writeLock.lock();
queueUsage.decUsed(nodePartition, resource);
@@ -477,9 +476,7 @@ public abstract class AbstractCSQueue implements CSQueue {
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, nodePartition);
- if (!changeContainerResource) {
- --numContainers;
- }
+ --numContainers;
} finally {
writeLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index a65b3d2..6d30386 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -231,14 +231,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
boolean sortQueues);
/**
- * We have a reserved increased container in the queue, we need to unreserve
- * it. Since we just want to cancel the reserved increase request instead of
- * stop the container, we shouldn't call completedContainer for such purpose.
- */
- public void unreserveIncreasedContainer(Resource clusterResource,
- FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer);
-
- /**
* Get the number of applications in the queue.
* @return number of applications
*/
@@ -333,13 +325,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* new resource asked
*/
public void decPendingResource(String nodeLabel, Resource resourceToDec);
-
- /**
- * Decrease container resource in the queue
- */
- public void decreaseContainer(Resource clusterResource,
- SchedContainerChangeRequest decreaseRequest,
- FiCaSchedulerApp app) throws InvalidResourceRequestException;
/**
* Get valid Node Labels for this queue
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 3517764..20ea607 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@@ -60,9 +59,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -85,7 +82,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -99,7 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -872,43 +868,6 @@ public class CapacityScheduler extends
}
}
- private LeafQueue updateIncreaseRequests(
- List<UpdateContainerRequest> increaseRequests, FiCaSchedulerApp app) {
- // When application has some pending to-be-removed resource requests,
- app.removedToBeRemovedIncreaseRequests();
-
- if (null == increaseRequests || increaseRequests.isEmpty()) {
- return null;
- }
-
- // Pre-process increase requests
- List<SchedContainerChangeRequest> schedIncreaseRequests =
- createSchedContainerChangeRequests(increaseRequests, true);
- LeafQueue leafQueue = (LeafQueue) app.getQueue();
-
- try {
- /*
- * Acquire application's lock here to make sure application won't
- * finish when updateIncreaseRequest is called.
- */
- app.getWriteLock().lock();
- // make sure we aren't stopping/removing the application
- // when the allocate comes in
- if (app.isStopped()) {
- return null;
- }
- // Process increase resource requests
- if (app.updateIncreaseRequests(schedIncreaseRequests)) {
- return leafQueue;
- }
- } finally {
- app.getWriteLock().unlock();
- }
-
-
- return null;
- }
-
@Override
@Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
@@ -920,21 +879,13 @@ public class CapacityScheduler extends
return EMPTY_ALLOCATION;
}
- // Handle promotions and demotions
- handleExecutionTypeUpdates(
- application, updateRequests.getPromotionRequests(),
- updateRequests.getDemotionRequests());
+ // Handle all container updates
+ handleContainerUpdates(application, updateRequests);
// Release containers
releaseContainers(release, application);
- // update increase requests
- LeafQueue updateDemandForQueue =
- updateIncreaseRequests(updateRequests.getIncreaseRequests(),
- application);
-
- // Decrease containers
- decreaseContainers(updateRequests.getDecreaseRequests(), application);
+ LeafQueue updateDemandForQueue = null;
// Sanity check for new allocation requests
normalizeRequests(ask);
@@ -959,8 +910,7 @@ public class CapacityScheduler extends
}
// Update application requests
- if (application.updateResourceRequests(ask) && (updateDemandForQueue
- == null)) {
+ if (application.updateResourceRequests(ask)) {
updateDemandForQueue = (LeafQueue) application.getQueue();
}
@@ -1466,7 +1416,7 @@ public class CapacityScheduler extends
(ContainerExpiredSchedulerEvent) event;
ContainerId containerId = containerExpiredEvent.getContainerId();
if (containerExpiredEvent.isIncrease()) {
- rollbackContainerResource(containerId);
+ rollbackContainerUpdate(containerId);
} else {
completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus(
@@ -1618,31 +1568,6 @@ public class CapacityScheduler extends
}
}
- private void rollbackContainerResource(
- ContainerId containerId) {
- RMContainer rmContainer = getRMContainer(containerId);
- if (rmContainer == null) {
- LOG.info("Cannot rollback resource for container " + containerId
- + ". The container does not exist.");
- return;
- }
- FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Cannot rollback resource for container " + containerId
- + ". The application that the container "
- + "belongs to does not exist.");
- return;
- }
- LOG.info("Roll back resource for container " + containerId);
-
- SchedulerNode schedulerNode = getSchedulerNode(
- rmContainer.getAllocatedNode());
- SchedContainerChangeRequest decreaseRequest =
- new SchedContainerChangeRequest(this.rmContext, schedulerNode,
- rmContainer, rmContainer.getLastConfirmedResource());
- decreaseContainer(decreaseRequest, application);
- }
-
@Override
protected void completedContainerInternal(
RMContainer rmContainer, ContainerStatus containerStatus,
@@ -1676,32 +1601,6 @@ public class CapacityScheduler extends
rmContainer, containerStatus, event, null, true);
}
- @Override
- protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
- SchedulerApplicationAttempt attempt) {
- RMContainer rmContainer = decreaseRequest.getRMContainer();
- // Check container status before doing decrease
- if (rmContainer.getState() != RMContainerState.RUNNING) {
- LOG.info(
- "Trying to decrease a container not in RUNNING state, container="
- + rmContainer + " state=" + rmContainer.getState().name());
- return;
- }
- FiCaSchedulerApp app = (FiCaSchedulerApp) attempt;
- LeafQueue queue = (LeafQueue) attempt.getQueue();
- try {
- queue.decreaseContainer(getClusterResource(), decreaseRequest, app);
- // Notify RMNode that the container can be pulled by NodeManager in the
- // next heartbeat
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
- Collections.singletonList(rmContainer.getContainer())));
- } catch (InvalidResourceRequestException e) {
- LOG.warn("Error happens when checking decrease request, Ignoring.."
- + " exception=", e);
- }
- }
-
@Lock(Lock.NoLock.class)
@VisibleForTesting
@Override
@@ -2386,8 +2285,8 @@ public class CapacityScheduler extends
getSchedulerContainer(rmContainer, true),
getSchedulerContainersToRelease(csAssignment),
getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
- false), csAssignment.isIncreasedAllocation(),
- csAssignment.getType(), csAssignment.getRequestLocalityType(),
+ false), csAssignment.getType(),
+ csAssignment.getRequestLocalityType(),
csAssignment.getSchedulingMode() != null ?
csAssignment.getSchedulingMode() :
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
@@ -2403,8 +2302,8 @@ public class CapacityScheduler extends
getSchedulerContainer(rmContainer, false),
getSchedulerContainersToRelease(csAssignment),
getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
- false), csAssignment.isIncreasedAllocation(),
- csAssignment.getType(), csAssignment.getRequestLocalityType(),
+ false), csAssignment.getType(),
+ csAssignment.getRequestLocalityType(),
csAssignment.getSchedulingMode() != null ?
csAssignment.getSchedulingMode() :
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org