You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/02/28 18:41:59 UTC
[2/2] hadoop git commit: YARN-6216. Unify Container Resizing code
paths with Container Updates making it scheduler agnostic. (Arun Suresh via
wangda)
YARN-6216. Unify Container Resizing code paths with Container Updates making it scheduler agnostic. (Arun Suresh via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eac6b4c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eac6b4c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eac6b4c3
Branch: refs/heads/trunk
Commit: eac6b4c35c50e555c2f1b5f913bb2c4d839f1ff4
Parents: 480b4dd
Author: Wangda Tan <wa...@apache.org>
Authored: Tue Feb 28 10:35:50 2017 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue Feb 28 10:35:50 2017 -0800
----------------------------------------------------------------------
.../sls/scheduler/ResourceSchedulerWrapper.java | 8 -
.../server/scheduler/SchedulerRequestKey.java | 12 +-
.../server/resourcemanager/RMServerUtils.java | 27 +-
.../rmcontainer/RMContainer.java | 4 -
.../RMContainerChangeResourceEvent.java | 44 ---
.../rmcontainer/RMContainerImpl.java | 46 ---
.../scheduler/AbstractYarnScheduler.java | 171 +++++++---
.../scheduler/AppSchedulingInfo.java | 283 +---------------
.../scheduler/ContainerUpdateContext.java | 193 ++++++++---
.../scheduler/SchedulerApplicationAttempt.java | 212 ++++--------
.../scheduler/SchedulerNode.java | 44 ---
.../scheduler/capacity/AbstractCSQueue.java | 13 +-
.../scheduler/capacity/CSQueue.java | 15 -
.../scheduler/capacity/CapacityScheduler.java | 121 +------
.../scheduler/capacity/LeafQueue.java | 152 +--------
.../scheduler/capacity/ParentQueue.java | 53 +--
.../capacity/allocator/ContainerAllocator.java | 31 +-
.../allocator/IncreaseContainerAllocator.java | 337 -------------------
.../common/ContainerAllocationProposal.java | 9 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 245 +++-----------
.../common/fica/FiCaSchedulerNode.java | 14 -
.../scheduler/fair/FairScheduler.java | 11 +-
.../scheduler/fifo/FifoScheduler.java | 8 -
.../scheduler/capacity/TestChildQueueOrder.java | 4 +-
.../capacity/TestContainerResizing.java | 134 +-------
.../capacity/TestIncreaseAllocationExpirer.java | 12 +-
.../scheduler/capacity/TestLeafQueue.java | 4 +-
.../scheduler/capacity/TestParentQueue.java | 4 +-
28 files changed, 482 insertions(+), 1729 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 5517362..df8323a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -969,12 +969,4 @@ final public class ResourceSchedulerWrapper
return Priority.newInstance(0);
}
- @Override
- protected void decreaseContainer(
- SchedContainerChangeRequest decreaseRequest,
- SchedulerApplicationAttempt attempt) {
- // TODO Auto-generated method stub
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
index 02539ba..c4f37f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -116,7 +116,17 @@ public final class SchedulerRequestKey implements
if (priorityCompare != 0) {
return priorityCompare;
}
- return Long.compare(allocationRequestId, o.getAllocationRequestId());
+ int allocReqCompare = Long.compare(
+ allocationRequestId, o.getAllocationRequestId());
+
+ if (allocReqCompare != 0) {
+ return allocReqCompare;
+ }
+
+ if (this.containerToUpdate != null && o.containerToUpdate != null) {
+ return (this.containerToUpdate.compareTo(o.containerToUpdate));
+ }
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index e98141b..0aa7a2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -152,26 +152,16 @@ public class RMServerUtils {
if (msg == null) {
if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) &&
(updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) {
- Resource original = rmContainer.getContainer().getResource();
- Resource target = updateReq.getCapability();
- if (Resources.fitsIn(target, original)) {
- // This is a decrease request
- if (validateIncreaseDecreaseRequest(rmContext, updateReq,
- maximumAllocation, false)) {
- updateRequests.getDecreaseRequests().add(updateReq);
- outstandingUpdate.add(updateReq.getContainerId());
- } else {
- msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
- }
- } else {
- // This is an increase request
- if (validateIncreaseDecreaseRequest(rmContext, updateReq,
- maximumAllocation, true)) {
+ if (validateIncreaseDecreaseRequest(
+ rmContext, updateReq, maximumAllocation)) {
+ if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
updateRequests.getIncreaseRequests().add(updateReq);
- outstandingUpdate.add(updateReq.getContainerId());
} else {
- msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+ updateRequests.getDecreaseRequests().add(updateReq);
}
+ outstandingUpdate.add(updateReq.getContainerId());
+ } else {
+ msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
}
} else {
ExecutionType original = rmContainer.getExecutionType();
@@ -329,8 +319,7 @@ public class RMServerUtils {
// Sanity check and normalize target resource
private static boolean validateIncreaseDecreaseRequest(RMContext rmContext,
- UpdateContainerRequest request, Resource maximumAllocation,
- boolean increase) {
+ UpdateContainerRequest request, Resource maximumAllocation) {
if (request.getCapability().getMemorySize() < 0
|| request.getCapability().getMemorySize() > maximumAllocation
.getMemorySize()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index 020764b..7ad381e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -91,10 +91,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
String getNodeHttpAddress();
String getNodeLabelExpression();
-
- boolean hasIncreaseReservation();
-
- void cancelIncreaseReservation();
String getQueueName();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
deleted file mode 100644
index 920cfdb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public class RMContainerChangeResourceEvent extends RMContainerEvent {
-
- final Resource targetResource;
- final boolean increase;
-
- public RMContainerChangeResourceEvent(ContainerId containerId,
- Resource targetResource, boolean increase) {
- super(containerId, RMContainerEventType.CHANGE_RESOURCE);
-
- this.targetResource = targetResource;
- this.increase = increase;
- }
-
- public Resource getTargetResource() {
- return targetResource;
- }
-
- public boolean isIncrease() {
- return increase;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 72ce1a0..12fbbea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -131,8 +131,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
- RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
- .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
RMContainerEventType.ACQUIRE_UPDATED_CONTAINER,
new ContainerAcquiredWhileRunningTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
@@ -183,7 +181,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
private boolean isAMContainer;
private List<ResourceRequest> resourceRequests;
- private volatile boolean hasIncreaseReservation = false;
// Only used for container resource increase and decrease. This is the
// resource to rollback to should container resource increase token expires.
private Resource lastConfirmedResource;
@@ -561,12 +558,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
if (c != null) {
c.setNodeId(container.reservedNode);
}
-
- if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED)
- .contains(container.getState())) {
- // When container's state != NEW/RESERVED, it is an increase reservation
- container.hasIncreaseReservation = true;
- }
}
}
@@ -681,33 +672,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
}
}
}
-
- private static final class ChangeResourceTransition extends BaseTransition {
-
- @Override
- public void transition(RMContainerImpl container, RMContainerEvent event) {
- RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event;
-
- Resource targetResource = changeEvent.getTargetResource();
- Resource lastConfirmedResource = container.lastConfirmedResource;
-
- if (!changeEvent.isIncrease()) {
- // Only unregister from the containerAllocationExpirer when target
- // resource is less than or equal to the last confirmed resource.
- if (Resources.fitsIn(targetResource, lastConfirmedResource)) {
- container.lastConfirmedResource = targetResource;
- container.containerAllocationExpirer.unregister(
- new AllocationExpirationInfo(event.getContainerId()));
- }
- }
-
- container.container.setResource(targetResource);
-
- // We reach here means we either allocated increase reservation OR
- // decreased container, reservation will be cancelled anyway.
- container.hasIncreaseReservation = false;
- }
- }
private static class FinishedTransition extends BaseTransition {
@@ -857,16 +821,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
return -1;
}
- @Override
- public boolean hasIncreaseReservation() {
- return hasIncreaseReservation;
- }
-
- @Override
- public void cancelIncreaseReservation() {
- hasIncreaseReservation = false;
- }
-
public void setQueueName(String queueName) {
this.queueName = queueName;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index ce6d2a2..213839d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -85,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -597,6 +600,8 @@ public abstract class AbstractYarnScheduler
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
completedContainerInternal(rmContainer, containerStatus, event);
+ completeOustandingUpdatesWhichAreReserved(
+ rmContainer, containerStatus, event);
} else {
ContainerId containerId = rmContainer.getContainerId();
// Inform the container
@@ -622,6 +627,33 @@ public abstract class AbstractYarnScheduler
recoverResourceRequestForContainer(rmContainer);
}
+ // Optimization:
+ // Check if there are in-flight container updates and complete the
+ // associated temp containers. These are removed when the app completes,
+ // but removing them when the actual container completes would allow the
+ // scheduler to reallocate those resources sooner.
+ private void completeOustandingUpdatesWhichAreReserved(
+ RMContainer rmContainer, ContainerStatus containerStatus,
+ RMContainerEventType event) {
+ N schedulerNode = getSchedulerNode(rmContainer.getNodeId());
+ if (schedulerNode != null &&
+ schedulerNode.getReservedContainer() != null) {
+ RMContainer resContainer = schedulerNode.getReservedContainer();
+ if (resContainer.getReservedSchedulerKey() != null) {
+ ContainerId containerToUpdate = resContainer
+ .getReservedSchedulerKey().getContainerToUpdate();
+ if (containerToUpdate != null &&
+ containerToUpdate.equals(containerStatus.getContainerId())) {
+ completedContainerInternal(resContainer,
+ ContainerStatus.newInstance(resContainer.getContainerId(),
+ containerStatus.getState(), containerStatus
+ .getDiagnostics(),
+ containerStatus.getExitStatus()), event);
+ }
+ }
+ }
+ }
+
// clean up a completed container
protected abstract void completedContainerInternal(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event);
@@ -650,28 +682,6 @@ public abstract class AbstractYarnScheduler
}
}
- protected void decreaseContainers(
- List<UpdateContainerRequest> decreaseRequests,
- SchedulerApplicationAttempt attempt) {
- if (null == decreaseRequests || decreaseRequests.isEmpty()) {
- return;
- }
- // Pre-process decrease requests
- List<SchedContainerChangeRequest> schedDecreaseRequests =
- createSchedContainerChangeRequests(decreaseRequests, false);
- for (SchedContainerChangeRequest request : schedDecreaseRequests) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing decrease request:" + request);
- }
- // handle decrease request
- decreaseContainer(request, attempt);
- }
- }
-
- protected abstract void decreaseContainer(
- SchedContainerChangeRequest decreaseRequest,
- SchedulerApplicationAttempt attempt);
-
@Override
public N getSchedulerNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId);
@@ -1074,21 +1084,39 @@ public abstract class AbstractYarnScheduler
}
}
- protected void handleExecutionTypeUpdates(
- SchedulerApplicationAttempt appAttempt,
- List<UpdateContainerRequest> promotionRequests,
- List<UpdateContainerRequest> demotionRequests) {
+ protected void handleContainerUpdates(
+ SchedulerApplicationAttempt appAttempt, ContainerUpdates updates) {
+ List<UpdateContainerRequest> promotionRequests =
+ updates.getPromotionRequests();
if (promotionRequests != null && !promotionRequests.isEmpty()) {
LOG.info("Promotion Update requests : " + promotionRequests);
- handlePromotionRequests(appAttempt, promotionRequests);
+ // Promotion is technically an increase request from
+ // 0 resources to target resources.
+ handleIncreaseRequests(appAttempt, promotionRequests);
}
+ List<UpdateContainerRequest> increaseRequests =
+ updates.getIncreaseRequests();
+ if (increaseRequests != null && !increaseRequests.isEmpty()) {
+ LOG.info("Resource increase requests : " + increaseRequests);
+ handleIncreaseRequests(appAttempt, increaseRequests);
+ }
+ List<UpdateContainerRequest> demotionRequests =
+ updates.getDemotionRequests();
if (demotionRequests != null && !demotionRequests.isEmpty()) {
LOG.info("Demotion Update requests : " + demotionRequests);
- handleDemotionRequests(appAttempt, demotionRequests);
+ // Demotion is technically a decrease request from initial
+ // to 0 resources
+ handleDecreaseRequests(appAttempt, demotionRequests);
+ }
+ List<UpdateContainerRequest> decreaseRequests =
+ updates.getDecreaseRequests();
+ if (decreaseRequests != null && !decreaseRequests.isEmpty()) {
+ LOG.info("Resource decrease requests : " + decreaseRequests);
+ handleDecreaseRequests(appAttempt, decreaseRequests);
}
}
- private void handlePromotionRequests(
+ private void handleIncreaseRequests(
SchedulerApplicationAttempt applicationAttempt,
List<UpdateContainerRequest> updateContainerRequests) {
for (UpdateContainerRequest uReq : updateContainerRequests) {
@@ -1118,7 +1146,7 @@ public abstract class AbstractYarnScheduler
}
}
- private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt,
+ private void handleDecreaseRequests(SchedulerApplicationAttempt appAttempt,
List<UpdateContainerRequest> demotionRequests) {
OpportunisticContainerContext oppCntxt =
appAttempt.getOpportunisticContainerContext();
@@ -1126,24 +1154,59 @@ public abstract class AbstractYarnScheduler
RMContainer rmContainer =
rmContext.getScheduler().getRMContainer(uReq.getContainerId());
if (rmContainer != null) {
- if (appAttempt.getUpdateContext().checkAndAddToOutstandingDecreases(
- rmContainer.getContainer())) {
- RMContainer demotedRMContainer =
- createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
- appAttempt.addToNewlyDemotedContainers(
- uReq.getContainerId(), demotedRMContainer);
+ SchedulerNode schedulerNode = rmContext.getScheduler()
+ .getSchedulerNode(rmContainer.getContainer().getNodeId());
+ if (appAttempt.getUpdateContext()
+ .checkAndAddToOutstandingDecreases(uReq, schedulerNode,
+ rmContainer.getContainer())) {
+ if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE ==
+ uReq.getContainerUpdateType()) {
+ RMContainer demotedRMContainer =
+ createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
+ appAttempt.addToNewlyDemotedContainers(
+ uReq.getContainerId(), demotedRMContainer);
+ } else {
+ RMContainer demotedRMContainer = createDecreasedRMContainer(
+ appAttempt, uReq, rmContainer);
+ appAttempt.addToNewlyDecreasedContainers(
+ uReq.getContainerId(), demotedRMContainer);
+ }
} else {
appAttempt.addToUpdateContainerErrors(
UpdateContainerError.newInstance(
RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
}
} else {
- LOG.warn("Cannot demote non-existent (or completed) Container ["
- + uReq.getContainerId() + "]");
+ LOG.warn("Cannot demote/decrease non-existent (or completed) " +
+ "Container [" + uReq.getContainerId() + "]");
}
}
}
+ private RMContainer createDecreasedRMContainer(
+ SchedulerApplicationAttempt appAttempt, UpdateContainerRequest uReq,
+ RMContainer rmContainer) {
+ SchedulerRequestKey sk =
+ SchedulerRequestKey.extractFrom(rmContainer.getContainer());
+ Container decreasedContainer = BuilderUtils.newContainer(
+ ContainerId.newContainerId(appAttempt.getApplicationAttemptId(),
+ appAttempt.getNewContainerId()),
+ rmContainer.getContainer().getNodeId(),
+ rmContainer.getContainer().getNodeHttpAddress(),
+ Resources.none(),
+ sk.getPriority(), null, rmContainer.getExecutionType(),
+ sk.getAllocationRequestId());
+ decreasedContainer.setVersion(rmContainer.getContainer().getVersion());
+ RMContainer newRmContainer = new RMContainerImpl(decreasedContainer,
+ sk, appAttempt.getApplicationAttemptId(),
+ decreasedContainer.getNodeId(), appAttempt.getUser(), rmContext,
+ rmContainer.isRemotelyAllocated());
+ appAttempt.addRMContainer(decreasedContainer.getId(), rmContainer);
+ ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
+ decreasedContainer.getNodeId()).allocateContainer(newRmContainer);
+ return newRmContainer;
+ }
+
private RMContainer createDemotedRMContainer(
SchedulerApplicationAttempt appAttempt,
OpportunisticContainerContext oppCntxt,
@@ -1162,4 +1225,36 @@ public abstract class AbstractYarnScheduler
return SchedulerUtils.createOpportunisticRmContainer(
rmContext, demotedContainer, false);
}
+
+ /**
+ * Rollback container update after expiry.
+ * @param containerId ContainerId.
+ */
+ protected void rollbackContainerUpdate(
+ ContainerId containerId) {
+ RMContainer rmContainer = getRMContainer(containerId);
+ if (rmContainer == null) {
+ LOG.info("Cannot rollback resource for container " + containerId
+ + ". The container does not exist.");
+ return;
+ }
+ T app = getCurrentAttemptForContainer(containerId);
+ if (getCurrentAttemptForContainer(containerId) == null) {
+ LOG.info("Cannot rollback resource for container " + containerId
+ + ". The application that the container "
+ + "belongs to does not exist.");
+ return;
+ }
+
+ if (Resources.fitsIn(rmContainer.getLastConfirmedResource(),
+ rmContainer.getContainer().getResource())) {
+ LOG.info("Roll back resource for container " + containerId);
+ handleDecreaseRequests(app, Arrays.asList(
+ UpdateContainerRequest.newInstance(
+ rmContainer.getContainer().getVersion(),
+ rmContainer.getContainerId(),
+ ContainerUpdateType.DECREASE_RESOURCE,
+ rmContainer.getLastConfirmedResource(), null)));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 48ecd2e..bff9c41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -90,9 +90,7 @@ public class AppSchedulingInfo {
schedulerKeys = new ConcurrentSkipListMap<>();
final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
- final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
- SchedContainerChangeRequest>>> containerIncreaseRequestMap =
- new ConcurrentHashMap<>();
+
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
@@ -158,137 +156,6 @@ public class AppSchedulingInfo {
LOG.info("Application " + applicationId + " requests cleared");
}
- public boolean hasIncreaseRequest(NodeId nodeId) {
- try {
- this.readLock.lock();
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- return requestsOnNode == null ? false : requestsOnNode.size() > 0;
- } finally {
- this.readLock.unlock();
- }
- }
-
- public Map<ContainerId, SchedContainerChangeRequest>
- getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) {
- try {
- this.readLock.lock();
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- return requestsOnNode == null ? null : requestsOnNode.get(
- schedulerKey);
- } finally {
- this.readLock.unlock();
- }
- }
-
- /**
- * return true if any of the existing increase requests are updated,
- * false if none of them are updated
- */
- public boolean updateIncreaseRequests(
- List<SchedContainerChangeRequest> increaseRequests) {
- boolean resourceUpdated = false;
-
- try {
- this.writeLock.lock();
- for (SchedContainerChangeRequest r : increaseRequests) {
- if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
- LOG.warn("rmContainer's state is not RUNNING, for increase request"
- + " with container-id=" + r.getContainerId());
- continue;
- }
- try {
- RMServerUtils.checkSchedContainerChangeRequest(r, true);
- } catch (YarnException e) {
- LOG.warn("Error happens when checking increase request, Ignoring.."
- + " exception=", e);
- continue;
- }
- NodeId nodeId = r.getRMContainer().getAllocatedNode();
-
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- requestsOnNode = new TreeMap<>();
- containerIncreaseRequestMap.put(nodeId, requestsOnNode);
- }
-
- SchedContainerChangeRequest prevChangeRequest =
- getIncreaseRequest(nodeId,
- r.getRMContainer().getAllocatedSchedulerKey(),
- r.getContainerId());
- if (null != prevChangeRequest) {
- if (Resources.equals(prevChangeRequest.getTargetCapacity(),
- r.getTargetCapacity())) {
- // increase request hasn't changed
- continue;
- }
-
- // remove the old one, as we will use the new one going forward
- removeIncreaseRequest(nodeId,
- prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
- prevChangeRequest.getContainerId());
- }
-
- if (Resources.equals(r.getTargetCapacity(),
- r.getRMContainer().getAllocatedResource())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to increase container " + r.getContainerId()
- + ", target capacity = previous capacity = " + prevChangeRequest
- + ". Will ignore this increase request.");
- }
- continue;
- }
-
- // add the new one
- resourceUpdated = true;
- insertIncreaseRequest(r);
- }
- return resourceUpdated;
- } finally {
- this.writeLock.unlock();
- }
- }
-
- /**
- * Insert increase request, adding any missing items in the data-structure
- * hierarchy.
- */
- private void insertIncreaseRequest(SchedContainerChangeRequest request) {
- NodeId nodeId = request.getNodeId();
- SchedulerRequestKey schedulerKey =
- request.getRMContainer().getAllocatedSchedulerKey();
- ContainerId containerId = request.getContainerId();
-
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- requestsOnNode = new HashMap<>();
- containerIncreaseRequestMap.put(nodeId, requestsOnNode);
- }
-
- Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
- requestsOnNode.get(schedulerKey);
- if (null == requestsOnNodeWithPriority) {
- requestsOnNodeWithPriority = new TreeMap<>();
- requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
- incrementSchedulerKeyReference(schedulerKey);
- }
-
- requestsOnNodeWithPriority.put(containerId, request);
-
- // update resources
- String partition = request.getRMContainer().getNodeLabelExpression();
- Resource delta = request.getDeltaCapacity();
- appResourceUsage.incPending(partition, delta);
- queue.incPendingResource(partition, delta);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added increase request:" + request.getContainerId()
- + " delta=" + delta);
- }
- }
private void incrementSchedulerKeyReference(
SchedulerRequestKey schedulerKey) {
@@ -312,73 +179,6 @@ public class AppSchedulingInfo {
}
}
- public boolean removeIncreaseRequest(NodeId nodeId,
- SchedulerRequestKey schedulerKey, ContainerId containerId) {
- try {
- this.writeLock.lock();
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- return false;
- }
-
- Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
- requestsOnNode.get(schedulerKey);
- if (null == requestsOnNodeWithPriority) {
- return false;
- }
-
- SchedContainerChangeRequest request =
- requestsOnNodeWithPriority.remove(containerId);
-
- // remove hierarchies if it becomes empty
- if (requestsOnNodeWithPriority.isEmpty()) {
- requestsOnNode.remove(schedulerKey);
- decrementSchedulerKeyReference(schedulerKey);
- }
- if (requestsOnNode.isEmpty()) {
- containerIncreaseRequestMap.remove(nodeId);
- }
-
- if (request == null) {
- return false;
- }
-
- // update queue's pending resource if request exists
- String partition = request.getRMContainer().getNodeLabelExpression();
- Resource delta = request.getDeltaCapacity();
- appResourceUsage.decPending(partition, delta);
- queue.decPendingResource(partition, delta);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("remove increase request:" + request);
- }
-
- return true;
- } finally {
- this.writeLock.unlock();
- }
- }
-
- public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
- SchedulerRequestKey schedulerKey, ContainerId containerId) {
- try {
- this.readLock.lock();
- Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
- requestsOnNode = containerIncreaseRequestMap.get(nodeId);
- if (null == requestsOnNode) {
- return null;
- }
-
- Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
- requestsOnNode.get(schedulerKey);
- return requestsOnNodeWithPriority == null ? null
- : requestsOnNodeWithPriority.get(containerId);
- } finally {
- this.readLock.unlock();
- }
- }
-
public ContainerUpdateContext getUpdateContext() {
return updateContext;
}
@@ -514,21 +314,6 @@ public class AppSchedulingInfo {
appResourceUsage.decPending(partition, toDecrease);
}
- private boolean hasRequestLabelChanged(ResourceRequest requestOne,
- ResourceRequest requestTwo) {
- String requestOneLabelExp = requestOne.getNodeLabelExpression();
- String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
- // First request label expression can be null and second request
- // is not null then we have to consider it as changed.
- if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
- return true;
- }
- // If the label is not matching between both request when
- // requestOneLabelExp is not null.
- return ((null != requestOneLabelExp) && !(requestOneLabelExp
- .equals(requestTwoLabelExp)));
- }
-
/**
* The ApplicationMaster is updating the placesBlacklistedByApp used for
* containers other than AMs.
@@ -601,22 +386,6 @@ public class AppSchedulingInfo {
return ret;
}
- public SchedulingPlacementSet getFirstSchedulingPlacementSet() {
- try {
- readLock.lock();
- for (SchedulerRequestKey key : schedulerKeys.keySet()) {
- SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(key);
- if (null != ps) {
- return ps;
- }
- }
- return null;
- } finally {
- readLock.unlock();
- }
-
- }
-
public PendingAsk getNextPendingAsk() {
try {
readLock.lock();
@@ -666,56 +435,6 @@ public class AppSchedulingInfo {
}
}
- public void increaseContainer(SchedContainerChangeRequest increaseRequest) {
- NodeId nodeId = increaseRequest.getNodeId();
- SchedulerRequestKey schedulerKey =
- increaseRequest.getRMContainer().getAllocatedSchedulerKey();
- ContainerId containerId = increaseRequest.getContainerId();
- Resource deltaCapacity = increaseRequest.getDeltaCapacity();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("allocated increase request : applicationId=" + applicationId
- + " container=" + containerId + " host="
- + increaseRequest.getNodeId() + " user=" + user + " resource="
- + deltaCapacity);
- }
- try {
- this.writeLock.lock();
- // Set queue metrics
- queue.getMetrics().allocateResources(user, deltaCapacity);
- // remove the increase request from pending increase request map
- removeIncreaseRequest(nodeId, schedulerKey, containerId);
- // update usage
- appResourceUsage.incUsed(increaseRequest.getNodePartition(),
- deltaCapacity);
- } finally {
- this.writeLock.unlock();
- }
- }
-
- public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) {
- // Delta is negative when it's a decrease request
- Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decrease container : applicationId=" + applicationId
- + " container=" + decreaseRequest.getContainerId() + " host="
- + decreaseRequest.getNodeId() + " user=" + user + " resource="
- + absDelta);
- }
-
- try {
- this.writeLock.lock();
- // Set queue metrics
- queue.getMetrics().releaseResources(user, absDelta);
-
- // update usage
- appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
- } finally {
- this.writeLock.unlock();
- }
- }
-
public List<ResourceRequest> allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey,
Container containerAllocated) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
index 7381250..5ac2ac5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -28,17 +28,19 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
+ .RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+ .SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.resource.Resources;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,43 +60,37 @@ public class ContainerUpdateContext {
private final Map<SchedulerRequestKey, Map<Resource,
Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>();
- private final Set<ContainerId> outstandingDecreases = new HashSet<>();
+ private final Map<ContainerId, Resource> outstandingDecreases =
+ new HashMap<>();
private final AppSchedulingInfo appSchedulingInfo;
ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
this.appSchedulingInfo = appSchedulingInfo;
}
- private synchronized boolean isBeingIncreased(Container container) {
- Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
- outstandingIncreases.get(
- new SchedulerRequestKey(container.getPriority(),
- container.getAllocationRequestId(), container.getId()));
- if (resourceMap != null) {
- Map<NodeId, Set<ContainerId>> locationMap =
- resourceMap.get(container.getResource());
- if (locationMap != null) {
- Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
- if (containerIds != null && !containerIds.isEmpty()) {
- return containerIds.contains(container.getId());
- }
- }
- }
- return false;
- }
-
/**
* Add the container to outstanding decreases.
+ * @param updateReq UpdateContainerRequest.
+ * @param schedulerNode SchedulerNode.
* @param container Container.
- * @return true if updated to outstanding decreases was successful.
+ * @return If it was possible to decrease the container.
*/
public synchronized boolean checkAndAddToOutstandingDecreases(
+ UpdateContainerRequest updateReq, SchedulerNode schedulerNode,
Container container) {
- if (isBeingIncreased(container)
- || outstandingDecreases.contains(container.getId())) {
+ if (outstandingDecreases.containsKey(container.getId())) {
return false;
}
- outstandingDecreases.add(container.getId());
+ if (ContainerUpdateType.DECREASE_RESOURCE ==
+ updateReq.getContainerUpdateType()) {
+ SchedulerRequestKey updateKey = new SchedulerRequestKey
+ (container.getPriority(),
+ container.getAllocationRequestId(), container.getId());
+ cancelPreviousRequest(schedulerNode, updateKey);
+ outstandingDecreases.put(container.getId(), updateReq.getCapability());
+ } else {
+ outstandingDecreases.put(container.getId(), container.getResource());
+ }
return true;
}
@@ -117,35 +113,63 @@ public class ContainerUpdateContext {
if (resourceMap == null) {
resourceMap = new HashMap<>();
outstandingIncreases.put(schedulerKey, resourceMap);
+ } else {
+ // Updating Resource for and existing increase container
+ if (ContainerUpdateType.INCREASE_RESOURCE ==
+ updateRequest.getContainerUpdateType()) {
+ cancelPreviousRequest(schedulerNode, schedulerKey);
+ } else {
+ return false;
+ }
}
+ Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
Map<NodeId, Set<ContainerId>> locationMap =
- resourceMap.get(container.getResource());
+ resourceMap.get(resToIncrease);
if (locationMap == null) {
locationMap = new HashMap<>();
- resourceMap.put(container.getResource(), locationMap);
+ resourceMap.put(resToIncrease, locationMap);
}
Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
if (containerIds == null) {
containerIds = new HashSet<>();
locationMap.put(container.getNodeId(), containerIds);
}
- if (containerIds.contains(container.getId())
- || outstandingDecreases.contains(container.getId())) {
+ if (outstandingDecreases.containsKey(container.getId())) {
return false;
}
- containerIds.add(container.getId());
- Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
- new HashMap<>();
- Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
- Map<String, ResourceRequest> resMap =
- createResourceRequests(rmContainer, schedulerNode,
- schedulerKey, resToIncrease);
- updateResReqs.put(schedulerKey, resMap);
- appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+ containerIds.add(container.getId());
+ if (!Resources.isNone(resToIncrease)) {
+ Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
+ new HashMap<>();
+ Map<String, ResourceRequest> resMap =
+ createResourceRequests(rmContainer, schedulerNode,
+ schedulerKey, resToIncrease);
+ updateResReqs.put(schedulerKey, resMap);
+ appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+ }
return true;
}
+ private void cancelPreviousRequest(SchedulerNode schedulerNode,
+ SchedulerRequestKey schedulerKey) {
+ SchedulingPlacementSet<SchedulerNode> schedulingPlacementSet =
+ appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
+ if (schedulingPlacementSet != null) {
+ Map<String, ResourceRequest> resourceRequests = schedulingPlacementSet
+ .getResourceRequests();
+ ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY);
+ // Decrement the pending using a dummy RR with
+ // resource = prev update req capability
+ if (prevReq != null) {
+ appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
+ schedulerKey, Container.newInstance(UNDEFINED,
+ schedulerNode.getNodeID(), "host:port",
+ prevReq.getCapability(), schedulerKey.getPriority(), null));
+ }
+ }
+ }
+
private Map<String, ResourceRequest> createResourceRequests(
RMContainer rmContainer, SchedulerNode schedulerNode,
SchedulerRequestKey schedulerKey, Resource resToIncrease) {
@@ -171,10 +195,16 @@ public class ContainerUpdateContext {
ContainerUpdateType.PROMOTE_EXECUTION_TYPE) {
return rmContainer.getContainer().getResource();
}
- // TODO: Fix this for container increase..
- // This has to equal the Resources in excess of fitsIn()
- // for container increase and is equal to the container total
- // resource for Promotion.
+ if (updateReq.getContainerUpdateType() ==
+ ContainerUpdateType.INCREASE_RESOURCE) {
+ // This has to equal the Resources in excess of fitsIn()
+ // for container increase and is equal to the container total
+ // resource for Promotion.
+ Resource maxCap = Resources.componentwiseMax(updateReq.getCapability(),
+ rmContainer.getContainer().getResource());
+ return Resources.add(maxCap,
+ Resources.negate(rmContainer.getContainer().getResource()));
+ }
return null;
}
@@ -228,6 +258,7 @@ public class ContainerUpdateContext {
/**
* Check if a new container is to be matched up against an outstanding
* Container increase request.
+ * @param node SchedulerNode.
* @param schedulerKey SchedulerRequestKey.
* @param rmContainer RMContainer.
* @return ContainerId.
@@ -264,4 +295,80 @@ public class ContainerUpdateContext {
}
return retVal;
}
+
+ /**
+ * Swaps the existing RMContainer's and the temp RMContainers internal
+ * container references after adjusting the resources in each.
+ * @param tempRMContainer Temp RMContainer.
+ * @param existingRMContainer Existing RMContainer.
+ * @param updateType Update Type.
+ * @return Existing RMContainer after swapping the container references.
+ */
+ public RMContainer swapContainer(RMContainer tempRMContainer,
+ RMContainer existingRMContainer, ContainerUpdateType updateType) {
+ ContainerId matchedContainerId = existingRMContainer.getContainerId();
+ // Swap updated container with the existing container
+ Container tempContainer = tempRMContainer.getContainer();
+
+ Resource updatedResource = createUpdatedResource(
+ tempContainer, existingRMContainer.getContainer(), updateType);
+ Resource resourceToRelease = createResourceToRelease(
+ existingRMContainer.getContainer(), updateType);
+ Container newContainer = Container.newInstance(matchedContainerId,
+ existingRMContainer.getContainer().getNodeId(),
+ existingRMContainer.getContainer().getNodeHttpAddress(),
+ updatedResource,
+ existingRMContainer.getContainer().getPriority(), null,
+ tempContainer.getExecutionType());
+ newContainer.setAllocationRequestId(
+ existingRMContainer.getContainer().getAllocationRequestId());
+ newContainer.setVersion(existingRMContainer.getContainer().getVersion());
+
+ tempRMContainer.getContainer().setResource(resourceToRelease);
+ tempRMContainer.getContainer().setExecutionType(
+ existingRMContainer.getContainer().getExecutionType());
+
+ ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
+ return existingRMContainer;
+ }
+
+ /**
+ * Returns the resource that the container will finally be assigned with
+ * at the end of the update operation.
+ * @param tempContainer Temporary Container created for the operation.
+ * @param existingContainer Existing Container.
+ * @param updateType Update Type.
+ * @return Final Resource.
+ */
+ private Resource createUpdatedResource(Container tempContainer,
+ Container existingContainer, ContainerUpdateType updateType) {
+ if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
+ return Resources.add(existingContainer.getResource(),
+ tempContainer.getResource());
+ } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+ return outstandingDecreases.get(existingContainer.getId());
+ } else {
+ return existingContainer.getResource();
+ }
+ }
+
+ /**
+ * Returns the resources that need to be released at the end of the update
+ * operation.
+ * @param existingContainer Existing Container.
+ * @param updateType Updated type.
+ * @return Resources to be released.
+ */
+ private Resource createResourceToRelease(Container existingContainer,
+ ContainerUpdateType updateType) {
+ if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
+ return Resources.none();
+ } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType){
+ return Resources.add(existingContainer.getResource(),
+ Resources.negate(
+ outstandingDecreases.get(existingContainer.getId())));
+ } else {
+ return existingContainer.getResource();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 0e79838..f894a40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppR
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerChangeResourceEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@@ -73,6 +73,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode
+ .RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@@ -136,9 +139,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>();
+ protected List<RMContainer> tempContainerToKill = new ArrayList<>();
protected Map<ContainerId, RMContainer> newlyPromotedContainers = new HashMap<>();
protected Map<ContainerId, RMContainer> newlyDemotedContainers = new HashMap<>();
- protected List<RMContainer> tempContainerToKill = new ArrayList<>();
protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>();
protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>();
protected Set<NMToken> updatedNMTokens = new HashSet<>();
@@ -670,6 +673,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
rmContainer.getContainerId(),
ContainerUpdateType.INCREASE_RESOURCE == updateType));
+ if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(),
+ Collections.singletonList(rmContainer.getContainer())));
+ }
}
return container;
}
@@ -717,11 +725,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
}
- public void addToNewlyDemotedContainers(ContainerId containerId,
+ public synchronized void addToNewlyDemotedContainers(ContainerId containerId,
RMContainer rmContainer) {
newlyDemotedContainers.put(containerId, rmContainer);
}
+ public synchronized void addToNewlyDecreasedContainers(
+ ContainerId containerId, RMContainer rmContainer) {
+ newlyDecreasedContainers.put(containerId, rmContainer);
+ }
+
protected synchronized void addToUpdateContainerErrors(
UpdateContainerError error) {
updateContainerErrors.add(error);
@@ -729,10 +742,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
protected synchronized void addToNewlyAllocatedContainers(
SchedulerNode node, RMContainer rmContainer) {
- if (oppContainerContext == null) {
- newlyAllocatedContainers.add(rmContainer);
- return;
- }
ContainerId matchedContainerId =
getUpdateContext().matchContainerToOutstandingIncreaseReq(
node, rmContainer.getAllocatedSchedulerKey(), rmContainer);
@@ -745,7 +754,21 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// occurs when using MiniYARNCluster to test).
tempContainerToKill.add(rmContainer);
} else {
- newlyPromotedContainers.put(matchedContainerId, rmContainer);
+ RMContainer existingContainer = getRMContainer(matchedContainerId);
+ // If this container was already GUARANTEED, then it is an
+ // increase, else its a promotion
+ if (existingContainer == null ||
+ EnumSet.of(RMContainerState.COMPLETED, RMContainerState.KILLED,
+ RMContainerState.EXPIRED, RMContainerState.RELEASED).contains(
+ existingContainer.getState())) {
+ tempContainerToKill.add(rmContainer);
+ } else {
+ if (ExecutionType.GUARANTEED == existingContainer.getExecutionType()) {
+ newlyIncreasedContainers.put(matchedContainerId, rmContainer);
+ } else {
+ newlyPromotedContainers.put(matchedContainerId, rmContainer);
+ }
+ }
}
} else {
newlyAllocatedContainers.add(rmContainer);
@@ -753,15 +776,25 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
public List<Container> pullNewlyPromotedContainers() {
- return pullContainersWithUpdatedExecType(newlyPromotedContainers,
+ return pullNewlyUpdatedContainers(newlyPromotedContainers,
ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
}
public List<Container> pullNewlyDemotedContainers() {
- return pullContainersWithUpdatedExecType(newlyDemotedContainers,
+ return pullNewlyUpdatedContainers(newlyDemotedContainers,
ContainerUpdateType.DEMOTE_EXECUTION_TYPE);
}
+ public List<Container> pullNewlyIncreasedContainers() {
+ return pullNewlyUpdatedContainers(newlyIncreasedContainers,
+ ContainerUpdateType.INCREASE_RESOURCE);
+ }
+
+ public List<Container> pullNewlyDecreasedContainers() {
+ return pullNewlyUpdatedContainers(newlyDecreasedContainers,
+ ContainerUpdateType.DECREASE_RESOURCE);
+ }
+
public List<UpdateContainerError> pullUpdateContainerErrors() {
List<UpdateContainerError> errors =
new ArrayList<>(updateContainerErrors);
@@ -775,11 +808,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
* GUARANTEED to OPPORTUNISTIC.
* @return Newly Promoted and Demoted containers
*/
- private List<Container> pullContainersWithUpdatedExecType(
+ private List<Container> pullNewlyUpdatedContainers(
Map<ContainerId, RMContainer> newlyUpdatedContainers,
ContainerUpdateType updateTpe) {
List<Container> updatedContainers = new ArrayList<>();
- if (oppContainerContext == null) {
+ if (oppContainerContext == null &&
+ (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateTpe
+ || ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateTpe)) {
return updatedContainers;
}
try {
@@ -789,19 +824,22 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
while (i.hasNext()) {
Map.Entry<ContainerId, RMContainer> entry = i.next();
ContainerId matchedContainerId = entry.getKey();
- RMContainer rmContainer = entry.getValue();
-
- // swap containers
- RMContainer existingRMContainer = swapContainer(
- rmContainer, matchedContainerId);
- getUpdateContext().removeFromOutstandingUpdate(
- rmContainer.getAllocatedSchedulerKey(),
- existingRMContainer.getContainer());
- Container updatedContainer = updateContainerAndNMToken(
- existingRMContainer, updateTpe);
- updatedContainers.add(updatedContainer);
-
- tempContainerToKill.add(rmContainer);
+ RMContainer tempRMContainer = entry.getValue();
+
+ RMContainer existingRMContainer =
+ getRMContainer(matchedContainerId);
+ if (existingRMContainer != null) {
+ // swap containers
+ existingRMContainer = getUpdateContext().swapContainer(
+ tempRMContainer, existingRMContainer, updateTpe);
+ getUpdateContext().removeFromOutstandingUpdate(
+ tempRMContainer.getAllocatedSchedulerKey(),
+ existingRMContainer.getContainer());
+ Container updatedContainer = updateContainerAndNMToken(
+ existingRMContainer, updateTpe);
+ updatedContainers.add(updatedContainer);
+ }
+ tempContainerToKill.add(tempRMContainer);
i.remove();
}
// Release all temporary containers
@@ -823,68 +861,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
}
- private RMContainer swapContainer(RMContainer rmContainer, ContainerId
- matchedContainerId) {
- RMContainer existingRMContainer =
- getRMContainer(matchedContainerId);
- if (existingRMContainer != null) {
- // Swap updated container with the existing container
- Container updatedContainer = rmContainer.getContainer();
-
- Container newContainer = Container.newInstance(matchedContainerId,
- existingRMContainer.getContainer().getNodeId(),
- existingRMContainer.getContainer().getNodeHttpAddress(),
- updatedContainer.getResource(),
- existingRMContainer.getContainer().getPriority(), null,
- updatedContainer.getExecutionType());
- newContainer.setAllocationRequestId(
- existingRMContainer.getContainer().getAllocationRequestId());
- newContainer.setVersion(existingRMContainer.getContainer().getVersion());
-
- rmContainer.getContainer().setResource(
- existingRMContainer.getContainer().getResource());
- rmContainer.getContainer().setExecutionType(
- existingRMContainer.getContainer().getExecutionType());
-
- ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
- }
- return existingRMContainer;
- }
-
- private List<Container> pullNewlyUpdatedContainers(
- Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
- try {
- writeLock.lock();
- List <Container> returnContainerList = new ArrayList <Container>(
- updatedContainerMap.size());
-
- Iterator<Entry<ContainerId, RMContainer>> i =
- updatedContainerMap.entrySet().iterator();
- while (i.hasNext()) {
- RMContainer rmContainer = i.next().getValue();
- Container updatedContainer = updateContainerAndNMToken(rmContainer,
- increase ? ContainerUpdateType.INCREASE_RESOURCE :
- ContainerUpdateType.DECREASE_RESOURCE);
- if (updatedContainer != null) {
- returnContainerList.add(updatedContainer);
- i.remove();
- }
- }
- return returnContainerList;
- } finally {
- writeLock.unlock();
- }
-
- }
-
- public List<Container> pullNewlyIncreasedContainers() {
- return pullNewlyUpdatedContainers(newlyIncreasedContainers, true);
- }
-
- public List<Container> pullNewlyDecreasedContainers() {
- return pullNewlyUpdatedContainers(newlyDecreasedContainers, false);
- }
-
public List<NMToken> pullUpdatedNMTokens() {
try {
writeLock.lock();
@@ -1252,68 +1228,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public ResourceUsage getSchedulingResourceUsage() {
return attemptResourceUsage;
}
-
- public boolean removeIncreaseRequest(NodeId nodeId,
- SchedulerRequestKey schedulerKey, ContainerId containerId) {
- try {
- writeLock.lock();
- return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey,
- containerId);
- } finally {
- writeLock.unlock();
- }
- }
-
- public boolean updateIncreaseRequests(
- List<SchedContainerChangeRequest> increaseRequests) {
- try {
- writeLock.lock();
- return appSchedulingInfo.updateIncreaseRequests(increaseRequests);
- } finally {
- writeLock.unlock();
- }
- }
-
- private void changeContainerResource(
- SchedContainerChangeRequest changeRequest, boolean increase) {
- try {
- writeLock.lock();
- if (increase) {
- appSchedulingInfo.increaseContainer(changeRequest);
- } else{
- appSchedulingInfo.decreaseContainer(changeRequest);
- }
-
- RMContainer changedRMContainer = changeRequest.getRMContainer();
- changedRMContainer.handle(
- new RMContainerChangeResourceEvent(changeRequest.getContainerId(),
- changeRequest.getTargetCapacity(), increase));
-
- // remove pending and not pulled by AM newly-increased or
- // decreased-containers and add the new one
- if (increase) {
- newlyDecreasedContainers.remove(changeRequest.getContainerId());
- newlyIncreasedContainers.put(changeRequest.getContainerId(),
- changedRMContainer);
- } else{
- newlyIncreasedContainers.remove(changeRequest.getContainerId());
- newlyDecreasedContainers.put(changeRequest.getContainerId(),
- changedRMContainer);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public void decreaseContainer(
- SchedContainerChangeRequest decreaseRequest) {
- changeContainerResource(decreaseRequest, false);
- }
-
- public void increaseContainer(
- SchedContainerChangeRequest increaseRequest) {
- changeContainerResource(increaseRequest, true);
- }
public void setAppAMNodePartitionName(String partitionName) {
this.appAMNodePartitionName = partitionName;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 9c2dff3..db17b42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -180,49 +180,6 @@ public abstract class SchedulerNode {
}
/**
- * Change the resources allocated for a container.
- * @param containerId Identifier of the container to change.
- * @param deltaResource Change in the resource allocation.
- * @param increase True if the change is an increase of allocation.
- */
- protected synchronized void changeContainerResource(ContainerId containerId,
- Resource deltaResource, boolean increase) {
- if (increase) {
- deductUnallocatedResource(deltaResource);
- } else {
- addUnallocatedResource(deltaResource);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug((increase ? "Increased" : "Decreased") + " container "
- + containerId + " of capacity " + deltaResource + " on host "
- + rmNode.getNodeAddress() + ", which has " + numContainers
- + " containers, " + getAllocatedResource() + " used and "
- + getUnallocatedResource() + " available after allocation");
- }
- }
-
- /**
- * Increase the resources allocated to a container.
- * @param containerId Identifier of the container to change.
- * @param deltaResource Increase of resource allocation.
- */
- public synchronized void increaseContainer(ContainerId containerId,
- Resource deltaResource) {
- changeContainerResource(containerId, deltaResource, true);
- }
-
- /**
- * Decrease the resources allocated to a container.
- * @param containerId Identifier of the container to change.
- * @param deltaResource Decrease of resource allocation.
- */
- public synchronized void decreaseContainer(ContainerId containerId,
- Resource deltaResource) {
- changeContainerResource(containerId, deltaResource, false);
- }
-
- /**
* Get unallocated resources on the node.
* @return Unallocated resources on the node
*/
@@ -280,7 +237,6 @@ public abstract class SchedulerNode {
if (info == null) {
return;
}
-
if (!releasedByNode && info.launchedOnNode) {
// wait until node reports container has completed
return;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index e9ef319..aa60c9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -453,14 +453,13 @@ public abstract class AbstractCSQueue implements CSQueue {
}
void allocateResource(Resource clusterResource,
- Resource resource, String nodePartition, boolean changeContainerResource) {
+ Resource resource, String nodePartition) {
try {
writeLock.lock();
queueUsage.incUsed(nodePartition, resource);
- if (!changeContainerResource) {
- ++numContainers;
- }
+ ++numContainers;
+
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, nodePartition);
} finally {
@@ -469,7 +468,7 @@ public abstract class AbstractCSQueue implements CSQueue {
}
protected void releaseResource(Resource clusterResource,
- Resource resource, String nodePartition, boolean changeContainerResource) {
+ Resource resource, String nodePartition) {
try {
writeLock.lock();
queueUsage.decUsed(nodePartition, resource);
@@ -477,9 +476,7 @@ public abstract class AbstractCSQueue implements CSQueue {
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, nodePartition);
- if (!changeContainerResource) {
- --numContainers;
- }
+ --numContainers;
} finally {
writeLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index a65b3d2..6d30386 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -231,14 +231,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
boolean sortQueues);
/**
- * We have a reserved increased container in the queue, we need to unreserve
- * it. Since we just want to cancel the reserved increase request instead of
- * stop the container, we shouldn't call completedContainer for such purpose.
- */
- public void unreserveIncreasedContainer(Resource clusterResource,
- FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer);
-
- /**
* Get the number of applications in the queue.
* @return number of applications
*/
@@ -333,13 +325,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* new resource asked
*/
public void decPendingResource(String nodeLabel, Resource resourceToDec);
-
- /**
- * Decrease container resource in the queue
- */
- public void decreaseContainer(Resource clusterResource,
- SchedContainerChangeRequest decreaseRequest,
- FiCaSchedulerApp app) throws InvalidResourceRequestException;
/**
* Get valid Node Labels for this queue
http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 3517764..20ea607 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@@ -60,9 +59,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -85,7 +82,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -99,7 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -872,43 +868,6 @@ public class CapacityScheduler extends
}
}
- private LeafQueue updateIncreaseRequests(
- List<UpdateContainerRequest> increaseRequests, FiCaSchedulerApp app) {
- // When application has some pending to-be-removed resource requests,
- app.removedToBeRemovedIncreaseRequests();
-
- if (null == increaseRequests || increaseRequests.isEmpty()) {
- return null;
- }
-
- // Pre-process increase requests
- List<SchedContainerChangeRequest> schedIncreaseRequests =
- createSchedContainerChangeRequests(increaseRequests, true);
- LeafQueue leafQueue = (LeafQueue) app.getQueue();
-
- try {
- /*
- * Acquire application's lock here to make sure application won't
- * finish when updateIncreaseRequest is called.
- */
- app.getWriteLock().lock();
- // make sure we aren't stopping/removing the application
- // when the allocate comes in
- if (app.isStopped()) {
- return null;
- }
- // Process increase resource requests
- if (app.updateIncreaseRequests(schedIncreaseRequests)) {
- return leafQueue;
- }
- } finally {
- app.getWriteLock().unlock();
- }
-
-
- return null;
- }
-
@Override
@Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
@@ -920,21 +879,13 @@ public class CapacityScheduler extends
return EMPTY_ALLOCATION;
}
- // Handle promotions and demotions
- handleExecutionTypeUpdates(
- application, updateRequests.getPromotionRequests(),
- updateRequests.getDemotionRequests());
+ // Handle all container updates
+ handleContainerUpdates(application, updateRequests);
// Release containers
releaseContainers(release, application);
- // update increase requests
- LeafQueue updateDemandForQueue =
- updateIncreaseRequests(updateRequests.getIncreaseRequests(),
- application);
-
- // Decrease containers
- decreaseContainers(updateRequests.getDecreaseRequests(), application);
+ LeafQueue updateDemandForQueue = null;
// Sanity check for new allocation requests
normalizeRequests(ask);
@@ -959,8 +910,7 @@ public class CapacityScheduler extends
}
// Update application requests
- if (application.updateResourceRequests(ask) && (updateDemandForQueue
- == null)) {
+ if (application.updateResourceRequests(ask)) {
updateDemandForQueue = (LeafQueue) application.getQueue();
}
@@ -1466,7 +1416,7 @@ public class CapacityScheduler extends
(ContainerExpiredSchedulerEvent) event;
ContainerId containerId = containerExpiredEvent.getContainerId();
if (containerExpiredEvent.isIncrease()) {
- rollbackContainerResource(containerId);
+ rollbackContainerUpdate(containerId);
} else {
completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus(
@@ -1618,31 +1568,6 @@ public class CapacityScheduler extends
}
}
- private void rollbackContainerResource(
- ContainerId containerId) {
- RMContainer rmContainer = getRMContainer(containerId);
- if (rmContainer == null) {
- LOG.info("Cannot rollback resource for container " + containerId
- + ". The container does not exist.");
- return;
- }
- FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Cannot rollback resource for container " + containerId
- + ". The application that the container "
- + "belongs to does not exist.");
- return;
- }
- LOG.info("Roll back resource for container " + containerId);
-
- SchedulerNode schedulerNode = getSchedulerNode(
- rmContainer.getAllocatedNode());
- SchedContainerChangeRequest decreaseRequest =
- new SchedContainerChangeRequest(this.rmContext, schedulerNode,
- rmContainer, rmContainer.getLastConfirmedResource());
- decreaseContainer(decreaseRequest, application);
- }
-
@Override
protected void completedContainerInternal(
RMContainer rmContainer, ContainerStatus containerStatus,
@@ -1676,32 +1601,6 @@ public class CapacityScheduler extends
rmContainer, containerStatus, event, null, true);
}
- @Override
- protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
- SchedulerApplicationAttempt attempt) {
- RMContainer rmContainer = decreaseRequest.getRMContainer();
- // Check container status before doing decrease
- if (rmContainer.getState() != RMContainerState.RUNNING) {
- LOG.info(
- "Trying to decrease a container not in RUNNING state, container="
- + rmContainer + " state=" + rmContainer.getState().name());
- return;
- }
- FiCaSchedulerApp app = (FiCaSchedulerApp) attempt;
- LeafQueue queue = (LeafQueue) attempt.getQueue();
- try {
- queue.decreaseContainer(getClusterResource(), decreaseRequest, app);
- // Notify RMNode that the container can be pulled by NodeManager in the
- // next heartbeat
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
- Collections.singletonList(rmContainer.getContainer())));
- } catch (InvalidResourceRequestException e) {
- LOG.warn("Error happens when checking decrease request, Ignoring.."
- + " exception=", e);
- }
- }
-
@Lock(Lock.NoLock.class)
@VisibleForTesting
@Override
@@ -2386,8 +2285,8 @@ public class CapacityScheduler extends
getSchedulerContainer(rmContainer, true),
getSchedulerContainersToRelease(csAssignment),
getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
- false), csAssignment.isIncreasedAllocation(),
- csAssignment.getType(), csAssignment.getRequestLocalityType(),
+ false), csAssignment.getType(),
+ csAssignment.getRequestLocalityType(),
csAssignment.getSchedulingMode() != null ?
csAssignment.getSchedulingMode() :
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
@@ -2403,8 +2302,8 @@ public class CapacityScheduler extends
getSchedulerContainer(rmContainer, false),
getSchedulerContainersToRelease(csAssignment),
getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
- false), csAssignment.isIncreasedAllocation(),
- csAssignment.getType(), csAssignment.getRequestLocalityType(),
+ false), csAssignment.getType(),
+ csAssignment.getRequestLocalityType(),
csAssignment.getSchedulingMode() != null ?
csAssignment.getSchedulingMode() :
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org