You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/22 20:27:03 UTC
[29/32] hadoop git commit: YARN-1651. CapacityScheduler side changes
to support container resize. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 869b49a..2ab060e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -60,10 +59,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
@@ -730,17 +729,22 @@ public class LeafQueue extends AbstractCSQueue {
}
private void handleExcessReservedContainer(Resource clusterResource,
- CSAssignment assignment) {
+ CSAssignment assignment, FiCaSchedulerNode node, FiCaSchedulerApp app) {
if (assignment.getExcessReservation() != null) {
RMContainer excessReservedContainer = assignment.getExcessReservation();
-
- completedContainer(clusterResource, assignment.getApplication(),
- scheduler.getNode(excessReservedContainer.getAllocatedNode()),
- excessReservedContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- excessReservedContainer.getContainerId(),
- SchedulerUtils.UNRESERVED_CONTAINER),
- RMContainerEventType.RELEASED, null, false);
+
+ if (excessReservedContainer.hasIncreaseReservation()) {
+ unreserveIncreasedContainer(clusterResource,
+ app, node, excessReservedContainer);
+ } else {
+ completedContainer(clusterResource, assignment.getApplication(),
+ scheduler.getNode(excessReservedContainer.getAllocatedNode()),
+ excessReservedContainer,
+ SchedulerUtils.createAbnormalContainerStatus(
+ excessReservedContainer.getContainerId(),
+ SchedulerUtils.UNRESERVED_CONTAINER),
+ RMContainerEventType.RELEASED, null, false);
+ }
assignment.setExcessReservation(null);
}
@@ -766,7 +770,8 @@ public class LeafQueue extends AbstractCSQueue {
CSAssignment assignment =
application.assignContainers(clusterResource, node,
currentResourceLimits, schedulingMode, reservedContainer);
- handleExcessReservedContainer(clusterResource, assignment);
+ handleExcessReservedContainer(clusterResource, assignment, node,
+ application);
return assignment;
}
}
@@ -824,7 +829,8 @@ public class LeafQueue extends AbstractCSQueue {
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
- handleExcessReservedContainer(clusterResource, assignment);
+ handleExcessReservedContainer(clusterResource, assignment, node,
+ application);
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
Resources.none())) {
@@ -836,7 +842,8 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
- node.getPartition(), reservedOrAllocatedRMContainer);
+ node.getPartition(), reservedOrAllocatedRMContainer,
+ assignment.isIncreasedAllocation());
// Done
return assignment;
@@ -1086,6 +1093,37 @@ public class LeafQueue extends AbstractCSQueue {
}
return true;
}
+
+ @Override
+ public void unreserveIncreasedContainer(Resource clusterResource,
+ FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
+ boolean removed = false;
+ Priority priority = null;
+
+ synchronized (this) {
+ if (rmContainer.getContainer() != null) {
+ priority = rmContainer.getContainer().getPriority();
+ }
+
+ if (null != priority) {
+ removed = app.unreserve(rmContainer.getContainer().getPriority(), node,
+ rmContainer);
+ }
+
+ if (removed) {
+ // Inform the ordering policy
+ orderingPolicy.containerReleased(app, rmContainer);
+
+ releaseResource(clusterResource, app, rmContainer.getReservedResource(),
+ node.getPartition(), rmContainer, true);
+ }
+ }
+
+ if (removed) {
+ getParent().unreserveIncreasedContainer(clusterResource, app, node,
+ rmContainer);
+ }
+ }
@Override
public void completedContainer(Resource clusterResource,
@@ -1093,6 +1131,15 @@ public class LeafQueue extends AbstractCSQueue {
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
boolean sortQueues) {
if (application != null) {
+ // unreserve container increase request if it previously reserved.
+ if (rmContainer.hasIncreaseReservation()) {
+ unreserveIncreasedContainer(clusterResource, application, node,
+ rmContainer);
+ }
+
+ // Remove container increase request if it exists
+ application.removeIncreaseRequest(node.getNodeID(),
+ rmContainer.getAllocatedPriority(), rmContainer.getContainerId());
boolean removed = false;
@@ -1123,7 +1170,7 @@ public class LeafQueue extends AbstractCSQueue {
orderingPolicy.containerReleased(application, rmContainer);
releaseResource(clusterResource, application, container.getResource(),
- node.getPartition(), rmContainer);
+ node.getPartition(), rmContainer, false);
}
}
@@ -1137,8 +1184,10 @@ public class LeafQueue extends AbstractCSQueue {
synchronized void allocateResource(Resource clusterResource,
SchedulerApplicationAttempt application, Resource resource,
- String nodePartition, RMContainer rmContainer) {
- super.allocateResource(clusterResource, resource, nodePartition);
+ String nodePartition, RMContainer rmContainer,
+ boolean isIncreasedAllocation) {
+ super.allocateResource(clusterResource, resource, nodePartition,
+ isIncreasedAllocation);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1174,8 +1223,9 @@ public class LeafQueue extends AbstractCSQueue {
synchronized void releaseResource(Resource clusterResource,
FiCaSchedulerApp application, Resource resource, String nodePartition,
- RMContainer rmContainer) {
- super.releaseResource(clusterResource, resource, nodePartition);
+ RMContainer rmContainer, boolean isChangeResource) {
+ super.releaseResource(clusterResource, resource, nodePartition,
+ isChangeResource);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1363,7 +1413,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt, rmContainer.getContainer()
- .getResource(), node.getPartition(), rmContainer);
+ .getResource(), node.getPartition(), rmContainer, false);
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
@@ -1412,7 +1462,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), node.getPartition(), rmContainer);
+ .getResource(), node.getPartition(), rmContainer, false);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1430,7 +1480,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), node.getPartition(), rmContainer);
+ .getResource(), node.getPartition(), rmContainer, false);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1482,6 +1532,39 @@ public class LeafQueue extends AbstractCSQueue {
public Priority getDefaultApplicationPriority() {
return defaultAppPriorityPerQueue;
}
+
+ @Override
+ public void decreaseContainer(Resource clusterResource,
+ SchedContainerChangeRequest decreaseRequest,
+ FiCaSchedulerApp app) {
+ // If the container being decreased is reserved, we need to unreserve it
+ // first.
+ RMContainer rmContainer = decreaseRequest.getRMContainer();
+ if (rmContainer.hasIncreaseReservation()) {
+ unreserveIncreasedContainer(clusterResource, app,
+ (FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer);
+ }
+
+ // Delta capacity is negative when it's a decrease request
+ Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
+
+ synchronized (this) {
+ // Delta is negative when it's a decrease request
+ releaseResource(clusterResource, app, absDelta,
+ decreaseRequest.getNodePartition(), decreaseRequest.getRMContainer(),
+ true);
+ // Notify application
+ app.decreaseContainer(decreaseRequest);
+ // Notify node
+ decreaseRequest.getSchedulerNode()
+ .decreaseContainer(decreaseRequest.getContainerId(), absDelta);
+ }
+
+ // Notify parent
+ if (getParent() != null) {
+ getParent().decreaseContainer(clusterResource, decreaseRequest, app);
+ }
+ }
public synchronized OrderingPolicy<FiCaSchedulerApp>
getPendingAppsOrderingPolicy() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index e01204c..badab72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -430,7 +431,7 @@ public class ParentQueue extends AbstractCSQueue {
assignedToChild.getResource(), Resources.none())) {
// Track resource utilization for the parent-queue
super.allocateResource(clusterResource, assignedToChild.getResource(),
- node.getPartition());
+ node.getPartition(), assignedToChild.isIncreasedAllocation());
// Track resource utilization in this pass of the scheduler
Resources
@@ -454,6 +455,8 @@ public class ParentQueue extends AbstractCSQueue {
.addAll(
assignedToChild.getAssignmentInformation()
.getReservationDetails());
+ assignment.setIncreasedAllocation(assignedToChild
+ .isIncreasedAllocation());
LOG.info("assignedContainer" +
" queue=" + getQueueName() +
@@ -616,6 +619,73 @@ public class ParentQueue extends AbstractCSQueue {
}
}
+ private synchronized void internalReleaseResource(Resource clusterResource,
+ FiCaSchedulerNode node, Resource releasedResource, boolean changeResource,
+ CSQueue completedChildQueue, boolean sortQueues) {
+ super.releaseResource(clusterResource,
+ releasedResource, node.getPartition(),
+ changeResource);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("completedContainer " + this + ", cluster=" + clusterResource);
+ }
+
+ // Note that this is using an iterator on the childQueues so this can't
+ // be called if already within an iterator for the childQueues. Like
+ // from assignContainersToChildQueues.
+ if (sortQueues) {
+ // reinsert the updated queue
+ for (Iterator<CSQueue> iter = childQueues.iterator(); iter.hasNext();) {
+ CSQueue csqueue = iter.next();
+ if (csqueue.equals(completedChildQueue)) {
+ iter.remove();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Re-sorting completed queue: " + csqueue);
+ }
+ childQueues.add(csqueue);
+ break;
+ }
+ }
+ }
+
+ // If we skipped sort queue this time, we need to resort queues to make
+ // sure we allocate from least usage (or order defined by queue policy)
+ // queues.
+ needToResortQueuesAtNextAllocation = !sortQueues;
+ }
+
+ @Override
+ public void decreaseContainer(Resource clusterResource,
+ SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app) {
+ // delta capacity is negative when it's a decrease request
+ Resource absDeltaCapacity =
+ Resources.negate(decreaseRequest.getDeltaCapacity());
+
+ internalReleaseResource(clusterResource,
+ csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false,
+ null, false);
+
+ // Inform the parent
+ if (parent != null) {
+ parent.decreaseContainer(clusterResource, decreaseRequest, app);
+ }
+ }
+
+ @Override
+ public void unreserveIncreasedContainer(Resource clusterResource,
+ FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
+ if (app != null) {
+ internalReleaseResource(clusterResource, node,
+ rmContainer.getReservedResource(), false, null, false);
+
+ // Inform the parent
+ if (parent != null) {
+ parent.unreserveIncreasedContainer(clusterResource, app, node,
+ rmContainer);
+ }
+ }
+ }
+
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node,
@@ -623,40 +693,9 @@ public class ParentQueue extends AbstractCSQueue {
RMContainerEventType event, CSQueue completedChildQueue,
boolean sortQueues) {
if (application != null) {
- // Careful! Locking order is important!
- // Book keeping
- synchronized (this) {
- super.releaseResource(clusterResource, rmContainer.getContainer()
- .getResource(), node.getPartition());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("completedContainer " + this + ", cluster=" + clusterResource);
- }
-
- // Note that this is using an iterator on the childQueues so this can't
- // be called if already within an iterator for the childQueues. Like
- // from assignContainersToChildQueues.
- if (sortQueues) {
- // reinsert the updated queue
- for (Iterator<CSQueue> iter = childQueues.iterator();
- iter.hasNext();) {
- CSQueue csqueue = iter.next();
- if(csqueue.equals(completedChildQueue)) {
- iter.remove();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Re-sorting completed queue: " + csqueue);
- }
- childQueues.add(csqueue);
- break;
- }
- }
- }
-
- // If we skipped sort queue this time, we need to resort queues to make
- // sure we allocate from least usage (or order defined by queue policy)
- // queues.
- needToResortQueuesAtNextAllocation = !sortQueues;
- }
+ internalReleaseResource(clusterResource, node,
+ rmContainer.getContainer().getResource(), false, completedChildQueue,
+ sortQueues);
// Inform the parent
if (parent != null) {
@@ -698,7 +737,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
- .getResource(), node.getPartition());
+ .getResource(), node.getPartition(), false);
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
@@ -726,7 +765,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
- .getResource(), node.getPartition());
+ .getResource(), node.getPartition(), false);
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@@ -746,7 +785,7 @@ public class ParentQueue extends AbstractCSQueue {
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.releaseResource(clusterResource,
rmContainer.getContainer().getResource(),
- node.getPartition());
+ node.getPartition(), false);
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
new file mode 100644
index 0000000..b986b1f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * For an application, resource limits and resource requests, decide how to
+ * allocate container. This is to make application resource allocation logic
+ * extensible.
+ */
+public abstract class AbstractContainerAllocator {
+ private static final Log LOG = LogFactory.getLog(AbstractContainerAllocator.class);
+
+ FiCaSchedulerApp application;
+ final ResourceCalculator rc;
+ final RMContext rmContext;
+
+ public AbstractContainerAllocator(FiCaSchedulerApp application,
+ ResourceCalculator rc, RMContext rmContext) {
+ this.application = application;
+ this.rc = rc;
+ this.rmContext = rmContext;
+ }
+
+ protected CSAssignment getCSAssignmentFromAllocateResult(
+ Resource clusterResource, ContainerAllocation result,
+ RMContainer rmContainer) {
+ // Handle skipped
+ boolean skipped =
+ (result.getAllocationState() == AllocationState.APP_SKIPPED);
+ CSAssignment assignment = new CSAssignment(skipped);
+ assignment.setApplication(application);
+
+ // Handle excess reservation
+ assignment.setExcessReservation(result.getContainerToBeUnreserved());
+
+ // If we allocated something
+ if (Resources.greaterThan(rc, clusterResource,
+ result.getResourceToBeAllocated(), Resources.none())) {
+ Resource allocatedResource = result.getResourceToBeAllocated();
+ Container updatedContainer = result.getUpdatedContainer();
+
+ assignment.setResource(allocatedResource);
+ assignment.setType(result.getContainerNodeType());
+
+ if (result.getAllocationState() == AllocationState.RESERVED) {
+ // This is a reserved container
+ LOG.info("Reserved container " + " application="
+ + application.getApplicationId() + " resource=" + allocatedResource
+ + " queue=" + this.toString() + " cluster=" + clusterResource);
+ assignment.getAssignmentInformation().addReservationDetails(
+ updatedContainer.getId(),
+ application.getCSLeafQueue().getQueuePath());
+ assignment.getAssignmentInformation().incrReservations();
+ Resources.addTo(assignment.getAssignmentInformation().getReserved(),
+ allocatedResource);
+ } else if (result.getAllocationState() == AllocationState.ALLOCATED){
+ // This is a new container
+ // Inform the ordering policy
+ LOG.info("assignedContainer" + " application attempt="
+ + application.getApplicationAttemptId() + " container="
+ + updatedContainer.getId() + " queue=" + this + " clusterResource="
+ + clusterResource);
+
+ application
+ .getCSLeafQueue()
+ .getOrderingPolicy()
+ .containerAllocated(application,
+ application.getRMContainer(updatedContainer.getId()));
+
+ assignment.getAssignmentInformation().addAllocationDetails(
+ updatedContainer.getId(),
+ application.getCSLeafQueue().getQueuePath());
+ assignment.getAssignmentInformation().incrAllocations();
+ Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
+ allocatedResource);
+
+ if (rmContainer != null) {
+ assignment.setFulfilledReservation(true);
+ }
+ }
+ }
+
+ return assignment;
+ }
+
+ /**
+ * allocate needs to handle following stuffs:
+ *
+ * <ul>
+ * <li>Select request: Select a request to allocate. E.g. select a resource
+ * request based on requirement/priority/locality.</li>
+ * <li>Check if a given resource can be allocated based on resource
+ * availability</li>
+ * <li>Do allocation: this will decide/create allocated/reserved
+ * container, this will also update metrics</li>
+ * </ul>
+ */
+ public abstract CSAssignment assignContainers(Resource clusterResource,
+ FiCaSchedulerNode node, SchedulingMode schedulingMode,
+ ResourceLimits resourceLimits, RMContainer reservedContainer);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
index 6e296cd..3be8e0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -18,13 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -33,118 +30,50 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-/**
- * For an application, resource limits and resource requests, decide how to
- * allocate container. This is to make application resource allocation logic
- * extensible.
- */
-public abstract class ContainerAllocator {
- private static final Log LOG = LogFactory.getLog(ContainerAllocator.class);
+public class ContainerAllocator extends AbstractContainerAllocator {
+ AbstractContainerAllocator increaseContainerAllocator;
+ AbstractContainerAllocator regularContainerAllocator;
- FiCaSchedulerApp application;
- final ResourceCalculator rc;
- final RMContext rmContext;
-
public ContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext) {
- this.application = application;
- this.rc = rc;
- this.rmContext = rmContext;
- }
+ super(application, rc, rmContext);
- protected boolean checkHeadroom(Resource clusterResource,
- ResourceLimits currentResourceLimits, Resource required,
- FiCaSchedulerNode node) {
- // If headroom + currentReservation < required, we cannot allocate this
- // require
- Resource resourceCouldBeUnReserved = application.getCurrentReservation();
- if (!application.getCSLeafQueue().getReservationContinueLooking()
- || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
- // If we don't allow reservation continuous looking, OR we're looking at
- // non-default node partition, we won't allow to unreserve before
- // allocation.
- resourceCouldBeUnReserved = Resources.none();
- }
- return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
- currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
- required);
+ increaseContainerAllocator =
+ new IncreaseContainerAllocator(application, rc, rmContext);
+ regularContainerAllocator =
+ new RegularContainerAllocator(application, rc, rmContext);
}
- protected CSAssignment getCSAssignmentFromAllocateResult(
- Resource clusterResource, ContainerAllocation result,
- RMContainer rmContainer) {
- // Handle skipped
- boolean skipped =
- (result.getAllocationState() == AllocationState.APP_SKIPPED);
- CSAssignment assignment = new CSAssignment(skipped);
- assignment.setApplication(application);
-
- // Handle excess reservation
- assignment.setExcessReservation(result.getContainerToBeUnreserved());
-
- // If we allocated something
- if (Resources.greaterThan(rc, clusterResource,
- result.getResourceToBeAllocated(), Resources.none())) {
- Resource allocatedResource = result.getResourceToBeAllocated();
- Container updatedContainer = result.getUpdatedContainer();
-
- assignment.setResource(allocatedResource);
- assignment.setType(result.getContainerNodeType());
-
- if (result.getAllocationState() == AllocationState.RESERVED) {
- // This is a reserved container
- LOG.info("Reserved container " + " application="
- + application.getApplicationId() + " resource=" + allocatedResource
- + " queue=" + this.toString() + " cluster=" + clusterResource);
- assignment.getAssignmentInformation().addReservationDetails(
- updatedContainer.getId(),
- application.getCSLeafQueue().getQueuePath());
- assignment.getAssignmentInformation().incrReservations();
- Resources.addTo(assignment.getAssignmentInformation().getReserved(),
- allocatedResource);
- } else if (result.getAllocationState() == AllocationState.ALLOCATED){
- // This is a new container
- // Inform the ordering policy
- LOG.info("assignedContainer" + " application attempt="
- + application.getApplicationAttemptId() + " container="
- + updatedContainer.getId() + " queue=" + this + " clusterResource="
- + clusterResource);
-
- application
- .getCSLeafQueue()
- .getOrderingPolicy()
- .containerAllocated(application,
- application.getRMContainer(updatedContainer.getId()));
-
- assignment.getAssignmentInformation().addAllocationDetails(
- updatedContainer.getId(),
- application.getCSLeafQueue().getQueuePath());
- assignment.getAssignmentInformation().incrAllocations();
- Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
- allocatedResource);
-
- if (rmContainer != null) {
- assignment.setFulfilledReservation(true);
- }
+ @Override
+ public CSAssignment assignContainers(Resource clusterResource,
+ FiCaSchedulerNode node, SchedulingMode schedulingMode,
+ ResourceLimits resourceLimits, RMContainer reservedContainer) {
+ if (reservedContainer != null) {
+ if (reservedContainer.getState() == RMContainerState.RESERVED) {
+ // It's a regular container
+ return regularContainerAllocator.assignContainers(clusterResource,
+ node, schedulingMode, resourceLimits, reservedContainer);
+ } else {
+ // It's a increase container
+ return increaseContainerAllocator.assignContainers(clusterResource,
+ node, schedulingMode, resourceLimits, reservedContainer);
+ }
+ } else {
+ /*
+ * Try to allocate increase container first, and if we failed to allocate
+ * anything, we will try to allocate regular container
+ */
+ CSAssignment assign =
+ increaseContainerAllocator.assignContainers(clusterResource, node,
+ schedulingMode, resourceLimits, null);
+ if (Resources.greaterThan(rc, clusterResource, assign.getResource(),
+ Resources.none())) {
+ return assign;
}
+
+ return regularContainerAllocator.assignContainers(clusterResource, node,
+ schedulingMode, resourceLimits, null);
}
-
- return assignment;
}
-
- /**
- * allocate needs to handle following stuffs:
- *
- * <ul>
- * <li>Select request: Select a request to allocate. E.g. select a resource
- * request based on requirement/priority/locality.</li>
- * <li>Check if a given resource can be allocated based on resource
- * availability</li>
- * <li>Do allocation: this will decide/create allocated/reserved
- * container, this will also update metrics</li>
- * </ul>
- */
- public abstract CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, SchedulingMode schedulingMode,
- ResourceLimits resourceLimits, RMContainer reservedContainer);
-}
\ No newline at end of file
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
new file mode 100644
index 0000000..9350adc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+public class IncreaseContainerAllocator extends AbstractContainerAllocator {
+ private static final Log LOG =
+ LogFactory.getLog(IncreaseContainerAllocator.class);
+
+ public IncreaseContainerAllocator(FiCaSchedulerApp application,
+ ResourceCalculator rc, RMContext rmContext) {
+ super(application, rc, rmContext);
+ }
+
+ /**
+ * Quick check if we can allocate anything here:
+ * We will not continue if:
+ * - Headroom doesn't support allocate minimumAllocation
+ * -
+ */
+ private boolean checkHeadroom(Resource clusterResource,
+ ResourceLimits currentResourceLimits, Resource required) {
+ return Resources.greaterThanOrEqual(rc, clusterResource,
+ currentResourceLimits.getHeadroom(), required);
+ }
+
+ private CSAssignment createReservedIncreasedCSAssignment(
+ SchedContainerChangeRequest request) {
+ CSAssignment assignment =
+ new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
+ application, false, false);
+ Resources.addTo(assignment.getAssignmentInformation().getReserved(),
+ request.getDeltaCapacity());
+ assignment.getAssignmentInformation().incrReservations();
+ assignment.getAssignmentInformation().addReservationDetails(
+ request.getContainerId(), application.getCSLeafQueue().getQueuePath());
+ assignment.setIncreasedAllocation(true);
+
+ LOG.info("Reserved increase container request:" + request.toString());
+
+ return assignment;
+ }
+
+ private CSAssignment createSuccessfullyIncreasedCSAssignment(
+ SchedContainerChangeRequest request, boolean fromReservation) {
+ CSAssignment assignment =
+ new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null,
+ application, false, fromReservation);
+ Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
+ request.getDeltaCapacity());
+ assignment.getAssignmentInformation().incrAllocations();
+ assignment.getAssignmentInformation().addAllocationDetails(
+ request.getContainerId(), application.getCSLeafQueue().getQueuePath());
+ assignment.setIncreasedAllocation(true);
+
+ // notify application
+ application
+ .getCSLeafQueue()
+ .getOrderingPolicy()
+ .containerAllocated(application,
+ application.getRMContainer(request.getContainerId()));
+
+ LOG.info("Approved increase container request:" + request.toString()
+ + " fromReservation=" + fromReservation);
+
+ return assignment;
+ }
+
+ private CSAssignment allocateIncreaseRequestFromReservedContainer(
+ SchedulerNode node, Resource cluster,
+ SchedContainerChangeRequest increaseRequest) {
+ if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(),
+ node.getAvailableResource())) {
+ // OK, we can allocate this increase request
+ // Unreserve it first
+ application.unreserve(increaseRequest.getPriority(),
+ (FiCaSchedulerNode) node, increaseRequest.getRMContainer());
+
+ // Notify application
+ application.increaseContainer(increaseRequest);
+
+ // Notify node
+ node.increaseContainer(increaseRequest.getContainerId(),
+ increaseRequest.getDeltaCapacity());
+
+ return createSuccessfullyIncreasedCSAssignment(increaseRequest, true);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to allocate reserved increase request:"
+ + increaseRequest.toString()
+ + ". There's no enough available resource");
+ }
+
+ // We still cannot allocate this container, will wait for next turn
+ return CSAssignment.SKIP_ASSIGNMENT;
+ }
+ }
+
+ private CSAssignment allocateIncreaseRequest(FiCaSchedulerNode node,
+ Resource cluster, SchedContainerChangeRequest increaseRequest) {
+ if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(),
+ node.getAvailableResource())) {
+ // Notify node
+ node.increaseContainer(increaseRequest.getContainerId(),
+ increaseRequest.getDeltaCapacity());
+
+ // OK, we can allocate this increase request
+ // Notify application
+ application.increaseContainer(increaseRequest);
+ return createSuccessfullyIncreasedCSAssignment(increaseRequest, false);
+ } else {
+ boolean reservationSucceeded =
+ application.reserveIncreasedContainer(increaseRequest.getPriority(),
+ node, increaseRequest.getRMContainer(),
+ increaseRequest.getDeltaCapacity());
+
+ if (reservationSucceeded) {
+ // We cannot allocate this container, but since queue capacity /
+ // user-limit matches, we can reserve this container on this node.
+ return createReservedIncreasedCSAssignment(increaseRequest);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reserve increase request=" + increaseRequest.toString()
+ + " failed. Skipping..");
+ }
+ return CSAssignment.SKIP_ASSIGNMENT;
+ }
+ }
+ }
+
+ @Override
+ public CSAssignment assignContainers(Resource clusterResource,
+ FiCaSchedulerNode node, SchedulingMode schedulingMode,
+ ResourceLimits resourceLimits, RMContainer reservedContainer) {
+ AppSchedulingInfo sinfo = application.getAppSchedulingInfo();
+ NodeId nodeId = node.getNodeID();
+
+ if (reservedContainer == null) {
+ // Do we have increase request on this node?
+ if (!sinfo.hasIncreaseRequest(nodeId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip allocating increase request since we don't have any"
+ + " increase request on this node=" + node.getNodeID());
+ }
+
+ return CSAssignment.SKIP_ASSIGNMENT;
+ }
+
+ // Check if we need to unreserve something, note that we don't support
+ // continuousReservationLooking now. TODO, need think more about how to
+ // support it.
+ boolean shouldUnreserve =
+ Resources.greaterThan(rc, clusterResource,
+ resourceLimits.getAmountNeededUnreserve(), Resources.none());
+
+ // Check if we can allocate minimum resource according to headroom
+ boolean cannotAllocateAnything =
+ !checkHeadroom(clusterResource, resourceLimits, rmContext
+ .getScheduler().getMinimumResourceCapability());
+
+ // Skip the app if we failed either of above check
+ if (cannotAllocateAnything || shouldUnreserve) {
+ if (LOG.isDebugEnabled()) {
+ if (shouldUnreserve) {
+ LOG.debug("Cannot continue since we have to unreserve some resource"
+ + ", now increase container allocation doesn't "
+ + "support continuous reservation looking..");
+ }
+ if (cannotAllocateAnything) {
+ LOG.debug("We cannot allocate anything because of low headroom, "
+ + "headroom=" + resourceLimits.getHeadroom());
+ }
+ }
+
+ return CSAssignment.SKIP_ASSIGNMENT;
+ }
+
+ CSAssignment assigned = null;
+
+ /*
+ * Loop each priority, and containerId. Container priority is not
+ * equivalent to request priority, application master can run an important
+ * task on a less prioritized container.
+ *
+ * So behavior here is, we still try to increase container with higher
+ * priority, but will skip increase request and move to next increase
+ * request if queue-limit or user-limit aren't satisfied
+ */
+ for (Priority priority : application.getPriorities()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Looking at increase request for application="
+ + application.getApplicationAttemptId() + " priority="
+ + priority);
+ }
+
+ /*
+ * If we have multiple to-be-increased containers under same priority on
+ * a same host, we will try to increase earlier launched container
+ * first. And again - we will skip a request and move to next if it
+ * cannot be allocated.
+ */
+ Map<ContainerId, SchedContainerChangeRequest> increaseRequestMap =
+ sinfo.getIncreaseRequests(nodeId, priority);
+
+ // We don't have more increase request on this priority, skip..
+ if (null == increaseRequestMap) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("There's no increase request for "
+ + application.getApplicationAttemptId() + " priority="
+ + priority);
+ }
+ continue;
+ }
+ Iterator<Entry<ContainerId, SchedContainerChangeRequest>> iter =
+ increaseRequestMap.entrySet().iterator();
+ List<SchedContainerChangeRequest> toBeRemovedRequests =
+ new ArrayList<>();
+
+ while (iter.hasNext()) {
+ Entry<ContainerId, SchedContainerChangeRequest> entry =
+ iter.next();
+ SchedContainerChangeRequest increaseRequest =
+ entry.getValue();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Looking at increase request=" + increaseRequest.toString());
+ }
+
+ boolean headroomSatisifed = checkHeadroom(clusterResource,
+ resourceLimits, increaseRequest.getDeltaCapacity());
+ if (!headroomSatisifed) {
+ // skip if doesn't satisfy headroom limit
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" Headroom is not satisfied, skip..");
+ }
+ continue;
+ }
+
+ RMContainer rmContainer = increaseRequest.getRMContainer();
+ if (rmContainer.getContainerState() != ContainerState.RUNNING) {
+ // if the container is not running, we should remove the
+ // increaseRequest and continue;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" Container is not running any more, skip...");
+ }
+ toBeRemovedRequests.add(increaseRequest);
+ continue;
+ }
+
+ if (!Resources.fitsIn(rc, clusterResource,
+ increaseRequest.getTargetCapacity(), node.getTotalResource())) {
+ // if the target capacity is more than what the node can offer, we
+ // will simply remove and skip it.
+ // The reason of doing check here instead of adding increase request
+ // to scheduler because node's resource could be updated after
+ // request added.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" Target capacity is more than what node can offer,"
+ + " node.resource=" + node.getTotalResource());
+ }
+ toBeRemovedRequests.add(increaseRequest);
+ continue;
+ }
+
+ // Try to allocate the increase request
+ assigned =
+ allocateIncreaseRequest(node, clusterResource, increaseRequest);
+ if (!assigned.getSkipped()) {
+ // When we don't skip this request, which means we either allocated
+ // OR reserved this request. We will break
+ break;
+ }
+ }
+
+ // Remove invalid in request requests
+ if (!toBeRemovedRequests.isEmpty()) {
+ for (SchedContainerChangeRequest req : toBeRemovedRequests) {
+ sinfo.removeIncreaseRequest(req.getNodeId(), req.getPriority(),
+ req.getContainerId());
+ }
+ }
+
+ // We already allocated something
+ if (!assigned.getSkipped()) {
+ break;
+ }
+ }
+
+ return assigned == null ? CSAssignment.SKIP_ASSIGNMENT : assigned;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to allocate reserved increase container request..");
+ }
+
+ // We already reserved this increase container
+ SchedContainerChangeRequest request =
+ sinfo.getIncreaseRequest(nodeId, reservedContainer.getContainer()
+ .getPriority(), reservedContainer.getContainerId());
+
+ // We will cancel the reservation any of following happens
+ // - Container finished
+ // - No increase request needed
+ // - Target resource updated
+ if (null == request
+ || reservedContainer.getContainerState() != ContainerState.RUNNING
+ || (!Resources.equals(reservedContainer.getReservedResource(),
+ request.getDeltaCapacity()))) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("We don't need reserved increase container request "
+ + "for container=" + reservedContainer.getContainerId()
+ + ". Unreserving and return...");
+ }
+
+ // We don't need this container now, just return excessive reservation
+ return new CSAssignment(application, reservedContainer);
+ }
+
+ return allocateIncreaseRequestFromReservedContainer(node, clusterResource,
+ request);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index dcb99ed..fd99d29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
* Allocate normal (new) containers, considers locality/label, etc. Using
* delayed scheduling mechanism to get better locality allocation.
*/
-public class RegularContainerAllocator extends ContainerAllocator {
+public class RegularContainerAllocator extends AbstractContainerAllocator {
private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
private ResourceRequest lastResourceRequest = null;
@@ -56,6 +56,25 @@ public class RegularContainerAllocator extends ContainerAllocator {
super(application, rc, rmContext);
}
+ private boolean checkHeadroom(Resource clusterResource,
+ ResourceLimits currentResourceLimits, Resource required,
+ FiCaSchedulerNode node) {
+ // If headroom + currentReservation < required, we cannot allocate this
+ // require
+ Resource resourceCouldBeUnReserved = application.getCurrentReservation();
+ if (!application.getCSLeafQueue().getReservationContinueLooking()
+ || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
+ // If we don't allow reservation continuous looking, OR we're looking at
+ // non-default node partition, we won't allow to unreserve before
+ // allocation.
+ resourceCouldBeUnReserved = Resources.none();
+ }
+ return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
+ currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
+ required);
+ }
+
+
private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, Priority priority) {
@@ -97,8 +116,9 @@ public class RegularContainerAllocator extends ContainerAllocator {
// Is the node-label-expression of this offswitch resource request
// matches the node's label?
// If not match, jump to next priority.
- if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(anyRequest,
- node.getPartition(), schedulingMode)) {
+ if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
+ anyRequest.getNodeLabelExpression(), node.getPartition(),
+ schedulingMode)) {
return ContainerAllocation.PRIORITY_SKIPPED;
}
@@ -388,8 +408,8 @@ public class RegularContainerAllocator extends ContainerAllocator {
}
// check if the resource request can access the label
- if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
- node.getPartition(), schedulingMode)) {
+ if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
+ request.getNodeLabelExpression(), node.getPartition(), schedulingMode)) {
// this is a reserved container, but we cannot allocate it now according
// to label not match. This can be caused by node label changed
// We should un-reserve this container.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 300cba9..e97da24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,7 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -83,7 +84,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private ResourceScheduler scheduler;
- private ContainerAllocator containerAllocator;
+ private AbstractContainerAllocator containerAllocator;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -118,7 +119,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
rc = scheduler.getResourceCalculator();
}
- containerAllocator = new RegularContainerAllocator(this, rc, rmContext);
+ containerAllocator = new ContainerAllocator(this, rc, rmContext);
}
synchronized public boolean containerCompleted(RMContainer rmContainer,
@@ -207,22 +208,24 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return rmContainer;
}
- public boolean unreserve(Priority priority,
+ public synchronized boolean unreserve(Priority priority,
FiCaSchedulerNode node, RMContainer rmContainer) {
+ // Cancel increase request (if it has reserved increase request
+ rmContainer.cancelIncreaseReservation();
+
// Done with the reservation?
- if (unreserve(node, priority)) {
+ if (internalUnreserve(node, priority)) {
node.unreserveResource(this);
// Update reserved metrics
queue.getMetrics().unreserveResource(getUser(),
- rmContainer.getContainer().getResource());
+ rmContainer.getReservedResource());
return true;
}
return false;
}
- @VisibleForTesting
- public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) {
+ private boolean internalUnreserve(FiCaSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
@@ -241,7 +244,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Reset the re-reservation count
resetReReservations(priority);
- Resource resource = reservedContainer.getContainer().getResource();
+ Resource resource = reservedContainer.getReservedResource();
this.attemptResourceUsage.decReserved(node.getPartition(), resource);
LOG.info("Application " + getApplicationId() + " unreserved "
@@ -311,13 +314,15 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
ResourceRequest rr = ResourceRequest.newInstance(
Priority.UNDEFINED, ResourceRequest.ANY,
minimumAllocation, numCont);
- ContainersAndNMTokensAllocation allocation =
- pullNewlyAllocatedContainersAndNMTokens();
+ List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
+ List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
+ List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
+ List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
Resource headroom = getHeadroom();
setApplicationHeadroomForMetrics(headroom);
- return new Allocation(allocation.getContainerList(), headroom, null,
- currentContPreemption, Collections.singletonList(rr),
- allocation.getNMTokenList());
+ return new Allocation(newlyAllocatedContainers, headroom, null,
+ currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
+ newlyIncreasedContainers, newlyDecreasedContainers);
}
synchronized public NodeId getNodeIdToUnreserve(Priority priority,
@@ -332,15 +337,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
NodeId nodeId = entry.getKey();
- Resource containerResource = entry.getValue().getContainer().getResource();
+ RMContainer reservedContainer = entry.getValue();
+ if (reservedContainer.hasIncreaseReservation()) {
+ // Currently, only regular container allocation supports continuous
+ // reservation looking, we don't support canceling increase request
+ // reservation when allocating regular container.
+ continue;
+ }
+
+ Resource reservedResource = reservedContainer.getReservedResource();
// make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
- if (Resources.lessThanOrEqual(rc, clusterResource,
- resourceNeedUnreserve, containerResource)) {
+ if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
+ reservedResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving node with reservation size: "
- + containerResource
+ + reservedResource
+ " in order to allocate container with size: " + resourceNeedUnreserve);
}
return nodeId;
@@ -374,6 +387,25 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
this.headroomProvider =
((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
}
+
+ public boolean reserveIncreasedContainer(Priority priority,
+ FiCaSchedulerNode node,
+ RMContainer rmContainer, Resource reservedResource) {
+ // Inform the application
+ if (super.reserveIncreasedContainer(node, priority, rmContainer,
+ reservedResource)) {
+
+ queue.getMetrics().reserveResource(getUser(), reservedResource);
+
+ // Update the node
+ node.reserveResource(this, priority, rmContainer);
+
+ // Succeeded
+ return true;
+ }
+
+ return false;
+ }
public void reserve(Priority priority,
FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index a083272..56e72d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -19,7 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@@ -32,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -68,7 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -914,7 +923,9 @@ public class FairScheduler extends
@Override
public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
- List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ List<String> blacklistAdditions, List<String> blacklistRemovals,
+ List<ContainerResourceChangeRequest> increaseRequests,
+ List<ContainerResourceChangeRequest> decreaseRequests) {
// Make sure this application exists
FSAppAttempt application = getSchedulerApp(appAttemptId);
@@ -973,18 +984,17 @@ public class FairScheduler extends
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
}
- ContainersAndNMTokensAllocation allocation =
- application.pullNewlyAllocatedContainersAndNMTokens();
-
+ List<Container> newlyAllocatedContainers =
+ application.pullNewlyAllocatedContainers();
// Record container allocation time
- if (!(allocation.getContainerList().isEmpty())) {
+ if (!(newlyAllocatedContainers.isEmpty())) {
application.recordContainerAllocationTime(getClock().getTime());
}
Resource headroom = application.getHeadroom();
application.setApplicationHeadroomForMetrics(headroom);
- return new Allocation(allocation.getContainerList(), headroom,
- preemptionContainerIds, null, null, allocation.getNMTokenList());
+ return new Allocation(newlyAllocatedContainers, headroom,
+ preemptionContainerIds, null, null, application.pullUpdatedNMTokens());
}
}
@@ -1725,4 +1735,11 @@ public class FairScheduler extends
}
return targetQueueName;
}
+
+ @Override
+ protected void decreaseContainer(
+ SchedContainerChangeRequest decreaseRequest,
+ SchedulerApplicationAttempt attempt) {
+ // TODO Auto-generated method stub
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 99760df..2ec2311 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -76,7 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -310,9 +311,11 @@ public class FifoScheduler extends
}
@Override
- public Allocation allocate(
- ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
- List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
+ List<ResourceRequest> ask, List<ContainerId> release,
+ List<String> blacklistAdditions, List<String> blacklistRemovals,
+ List<ContainerResourceChangeRequest> increaseRequests,
+ List<ContainerResourceChangeRequest> decreaseRequests) {
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
@@ -364,12 +367,10 @@ public class FifoScheduler extends
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
}
- ContainersAndNMTokensAllocation allocation =
- application.pullNewlyAllocatedContainersAndNMTokens();
Resource headroom = application.getHeadroom();
application.setApplicationHeadroomForMetrics(headroom);
- return new Allocation(allocation.getContainerList(), headroom, null,
- null, null, allocation.getNMTokenList());
+ return new Allocation(application.pullNewlyAllocatedContainers(),
+ headroom, null, null, null, application.pullUpdatedNMTokens());
}
}
@@ -1005,4 +1006,12 @@ public class FifoScheduler extends
public Resource getUsedResource() {
return usedResource;
}
+
+ @Override
+ protected void decreaseContainer(
+ SchedContainerChangeRequest decreaseRequest,
+ SchedulerApplicationAttempt attempt) {
+ // TODO Auto-generated method stub
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index e62f7d7..b536546 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -323,7 +323,7 @@ public class Application {
// Get resources from the ResourceManager
Allocation allocation = resourceManager.getResourceScheduler().allocate(
applicationAttemptId, new ArrayList<ResourceRequest>(ask),
- new ArrayList<ContainerId>(), null, null);
+ new ArrayList<ContainerId>(), null, null, null, null);
System.out.println("-=======" + applicationAttemptId);
System.out.println("----------" + resourceManager.getRMContext().getRMApps()
.get(applicationId).getRMAppAttempt(applicationAttemptId));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 5660b78..c325a65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -235,6 +236,14 @@ public class MockAM {
releases, null);
return allocate(req);
}
+
+ public AllocateResponse sendContainerResizingRequest(
+ List<ContainerResourceChangeRequest> increaseRequests,
+ List<ContainerResourceChangeRequest> decreaseRequests) throws Exception {
+ final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null,
+ null, increaseRequests, decreaseRequests);
+ return allocate(req);
+ }
public AllocateResponse allocate(AllocateRequest allocateRequest)
throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 53cb8d0..92f3edf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -231,6 +233,17 @@ public class MockNodes {
}
return CommonNodeLabelsManager.EMPTY_STRING_SET;
}
+
+ @Override
+ public void updateNodeHeartbeatResponseForContainersDecreasing(
+ NodeHeartbeatResponse response) {
+
+ }
+
+ @Override
+ public List<Container> pullNewlyIncreasedContainers() {
+ return Collections.emptyList();
+ }
};
private static RMNode buildRMNode(int rack, final Resource perNode,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/874cc98e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 249f093..7ce42f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -191,6 +191,19 @@ public class MockRM extends ResourceManager {
}
}
+ public void waitForContainerState(ContainerId containerId,
+ RMContainerState state) throws Exception {
+ int timeoutSecs = 0;
+ RMContainer container = getResourceScheduler().getRMContainer(containerId);
+ while ((container == null || container.getState() != state)
+ && timeoutSecs++ < 40) {
+ System.out.println(
+ "Waiting for" + containerId + " state to be:" + state.name());
+ Thread.sleep(200);
+ }
+ Assert.assertTrue(container.getState() == state);
+ }
+
public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
throws Exception {
int timeoutSecs = 0;