You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2015/03/25 15:54:35 UTC
[05/51] [abbrv] hadoop git commit: YARN-3356. Capacity Scheduler
FiCaSchedulerApp should use ResourceUsage to track used-resources-by-label.
Contributed by Wangda Tan
YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to track used-resources-by-label. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/586348e4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/586348e4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/586348e4
Branch: refs/heads/YARN-2139
Commit: 586348e4cbf197188057d6b843a6701cfffdaff3
Parents: d81109e
Author: Jian He <ji...@apache.org>
Authored: Fri Mar 20 13:54:01 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Fri Mar 20 13:54:01 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../scheduler/AbstractYarnScheduler.java | 5 +-
.../scheduler/AppSchedulingInfo.java | 27 ++-
.../server/resourcemanager/scheduler/Queue.java | 20 +++
.../scheduler/ResourceUsage.java | 19 ++-
.../scheduler/SchedulerApplicationAttempt.java | 50 +++---
.../scheduler/SchedulerNode.java | 14 ++
.../scheduler/capacity/AbstractCSQueue.java | 24 +++
.../scheduler/capacity/LeafQueue.java | 29 +++-
.../scheduler/common/fica/FiCaSchedulerApp.java | 17 +-
.../scheduler/fair/FSAppAttempt.java | 11 +-
.../resourcemanager/scheduler/fair/FSQueue.java | 8 +
.../scheduler/fifo/FifoScheduler.java | 12 +-
.../yarn/server/resourcemanager/MockAM.java | 24 +--
.../capacity/TestCapacityScheduler.java | 167 ++++++++++++++++++-
.../scheduler/capacity/TestChildQueueOrder.java | 3 +-
.../capacity/TestContainerAllocation.java | 70 +-------
.../scheduler/capacity/TestReservations.java | 6 +-
.../scheduler/capacity/TestUtils.java | 129 ++++++++++++++
19 files changed, 509 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index bbd018a..046b7b1 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -65,6 +65,9 @@ Release 2.8.0 - UNRELEASED
YARN-3357. Move TestFifoScheduler to FIFO package. (Rohith Sharmaks
via devaraj)
+ YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to
+ track used-resources-by-label. (Wangda Tan via jianhe)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 968a767..e1f94cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -358,14 +358,15 @@ public abstract class AbstractYarnScheduler
container));
// recover scheduler node
- nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
+ SchedulerNode schedulerNode = nodes.get(nm.getNodeID());
+ schedulerNode.recoverContainer(rmContainer);
// recover queue: update headroom etc.
Queue queue = schedulerAttempt.getQueue();
queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
// recover scheduler attempt
- schedulerAttempt.recoverContainer(rmContainer);
+ schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
// set master container for the current running AMContainer for this
// attempt.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 1324c7d..84ebe9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -191,6 +189,16 @@ public class AppSchedulingInfo {
request.getCapability());
metrics.decrPendingResources(user, lastRequestContainers,
lastRequestCapability);
+
+ // update queue:
+ queue.incPendingResource(
+ request.getNodeLabelExpression(),
+ Resources.multiply(request.getCapability(),
+ request.getNumContainers()));
+ if (lastRequest != null) {
+ queue.decPendingResource(lastRequest.getNodeLabelExpression(),
+ Resources.multiply(lastRequestCapability, lastRequestContainers));
+ }
}
}
}
@@ -376,6 +384,9 @@ public class AppSchedulingInfo {
if (numOffSwitchContainers == 0) {
checkForDeactivation();
}
+
+ queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
+ offSwitchRequest.getCapability());
}
synchronized private void checkForDeactivation() {
@@ -404,6 +415,12 @@ public class AppSchedulingInfo {
request.getCapability());
newMetrics.incrPendingResources(user, request.getNumContainers(),
request.getCapability());
+
+ Resource delta = Resources.multiply(request.getCapability(),
+ request.getNumContainers());
+ // Update Queue
+ queue.decPendingResource(request.getNodeLabelExpression(), delta);
+ newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
}
}
oldMetrics.moveAppFrom(this);
@@ -423,6 +440,12 @@ public class AppSchedulingInfo {
if (request != null) {
metrics.decrPendingResources(user, request.getNumContainers(),
request.getCapability());
+
+ // Update Queue
+ queue.decPendingResource(
+ request.getNodeLabelExpression(),
+ Resources.multiply(request.getCapability(),
+ request.getNumContainers()));
}
}
metrics.finishAppAttempt(applicationId, pending, user);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
index 4663a91..02003c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
@@ -90,4 +90,24 @@ public interface Queue {
* @return default label expression
*/
public String getDefaultNodeLabelExpression();
+
+ /**
+ * When new outstanding resource is asked, calling this will increase pending
+ * resource in a queue.
+ *
+ * @param nodeLabel asked by application
+ * @param resourceToInc new resource asked
+ */
+ public void incPendingResource(String nodeLabel, Resource resourceToInc);
+
+ /**
+ * When an outstanding resource is fulfilled or canceled, calling this will
+ * decrease pending resource in a queue.
+ *
+ * @param nodeLabel
+ * asked by application
+ * @param resourceToDec
+ * new resource asked
+ */
+ public void decPendingResource(String nodeLabel, Resource resourceToDec);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index de44bbe..36ee4da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -75,14 +76,17 @@ public class ResourceUsage {
};
}
+ public Resource getUsed() {
+ return resArr[ResourceType.USED.idx];
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{used=" + resArr[0] + "%, ");
sb.append("pending=" + resArr[1] + "%, ");
sb.append("am_used=" + resArr[2] + "%, ");
- sb.append("reserved=" + resArr[3] + "%, ");
- sb.append("headroom=" + resArr[4] + "%}");
+ sb.append("reserved=" + resArr[3] + "%}");
return sb.toString();
}
}
@@ -117,6 +121,17 @@ public class ResourceUsage {
public void setUsed(Resource res) {
setUsed(NL, res);
}
+
+ public void copyAllUsed(ResourceUsage other) {
+ try {
+ writeLock.lock();
+ for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
+ setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed()));
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
public void setUsed(String label, Resource res) {
_set(label, ResourceType.USED, res);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 9816699..799a5c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -87,13 +87,12 @@ public class SchedulerApplicationAttempt {
private final Multiset<Priority> reReservations = HashMultiset.create();
- protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
- protected Resource currentConsumption = Resource.newInstance(0, 0);
- private Resource amResource = Resources.none();
private boolean unmanagedAM = true;
private boolean amRunning = false;
private LogAggregationContext logAggregationContext;
+
+ protected ResourceUsage attemptResourceUsage = new ResourceUsage();
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
@@ -217,11 +216,11 @@ public class SchedulerApplicationAttempt {
}
public Resource getAMResource() {
- return amResource;
+ return attemptResourceUsage.getAMUsed();
}
public void setAMResource(Resource amResource) {
- this.amResource = amResource;
+ attemptResourceUsage.setAMUsed(amResource);
}
public boolean isAmRunning() {
@@ -260,7 +259,7 @@ public class SchedulerApplicationAttempt {
@Stable
@Private
public synchronized Resource getCurrentReservation() {
- return currentReservation;
+ return attemptResourceUsage.getReserved();
}
public Queue getQueue() {
@@ -311,8 +310,8 @@ public class SchedulerApplicationAttempt {
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
-
- Resources.addTo(currentReservation, container.getResource());
+ attemptResourceUsage.incReserved(node.getPartition(),
+ container.getResource());
// Reset the re-reservation count
resetReReservations(priority);
@@ -336,7 +335,7 @@ public class SchedulerApplicationAttempt {
+ " reserved container " + rmContainer + " on node " + node
+ ". This attempt currently has " + reservedContainers.size()
+ " reserved containers at priority " + priority
- + "; currentReservation " + currentReservation.getMemory());
+ + "; currentReservation " + container.getResource());
}
return rmContainer;
@@ -402,9 +401,9 @@ public class SchedulerApplicationAttempt {
for (Priority priority : getPriorities()) {
Map<String, ResourceRequest> requests = getResourceRequests(priority);
if (requests != null) {
- LOG.debug("showRequests:" + " application=" + getApplicationId() +
- " headRoom=" + getHeadroom() +
- " currentConsumption=" + currentConsumption.getMemory());
+ LOG.debug("showRequests:" + " application=" + getApplicationId()
+ + " headRoom=" + getHeadroom() + " currentConsumption="
+ + attemptResourceUsage.getUsed().getMemory());
for (ResourceRequest request : requests.values()) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " request=" + request);
@@ -415,7 +414,7 @@ public class SchedulerApplicationAttempt {
}
public Resource getCurrentConsumption() {
- return currentConsumption;
+ return attemptResourceUsage.getUsed();
}
public static class ContainersAndNMTokensAllocation {
@@ -548,12 +547,17 @@ public class SchedulerApplicationAttempt {
}
public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
- AggregateAppResourceUsage resUsage = getRunningAggregateAppResourceUsage();
+ AggregateAppResourceUsage runningResourceUsage =
+ getRunningAggregateAppResourceUsage();
+ Resource usedResourceClone =
+ Resources.clone(attemptResourceUsage.getUsed());
+ Resource reservedResourceClone =
+ Resources.clone(attemptResourceUsage.getReserved());
return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
- reservedContainers.size(), Resources.clone(currentConsumption),
- Resources.clone(currentReservation),
- Resources.add(currentConsumption, currentReservation),
- resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
+ reservedContainers.size(), usedResourceClone, reservedResourceClone,
+ Resources.add(usedResourceClone, reservedResourceClone),
+ runningResourceUsage.getMemorySeconds(),
+ runningResourceUsage.getVcoreSeconds());
}
public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
@@ -572,7 +576,7 @@ public class SchedulerApplicationAttempt {
SchedulerApplicationAttempt appAttempt) {
this.liveContainers = appAttempt.getLiveContainersMap();
// this.reReservations = appAttempt.reReservations;
- this.currentConsumption = appAttempt.getCurrentConsumption();
+ this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage);
this.resourceLimit = appAttempt.getResourceLimit();
// this.currentReservation = appAttempt.currentReservation;
// this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
@@ -603,7 +607,8 @@ public class SchedulerApplicationAttempt {
this.queue = newQueue;
}
- public synchronized void recoverContainer(RMContainer rmContainer) {
+ public synchronized void recoverContainer(SchedulerNode node,
+ RMContainer rmContainer) {
// recover app scheduling info
appSchedulingInfo.recoverContainer(rmContainer);
@@ -613,8 +618,9 @@ public class SchedulerApplicationAttempt {
LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+ " is recovering container " + rmContainer.getContainerId());
liveContainers.put(rmContainer.getContainerId(), rmContainer);
- Resources.addTo(currentConsumption, rmContainer.getContainer()
- .getResource());
+ attemptResourceUsage.incUsed(node.getPartition(), rmContainer
+ .getContainer().getResource());
+
// resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
// is called.
// newlyAllocatedContainers.add(rmContainer);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 2901134..f03663a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -294,4 +295,17 @@ public abstract class SchedulerNode {
public void updateLabels(Set<String> labels) {
this.labels = labels;
}
+
+ /**
+ * Get partition of which the node belongs to, if node-labels of this node is
+ * empty or null, it belongs to NO_LABEL partition. And since we only support
+ * one partition for each node (YARN-2694), first label will be its partition.
+ */
+ public String getPartition() {
+ if (this.labels == null || this.labels.isEmpty()) {
+ return RMNodeLabelsManager.NO_LABEL;
+ } else {
+ return this.labels.iterator().next();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 4e53060..3cd85ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -509,4 +509,28 @@ public abstract class AbstractCSQueue implements CSQueue {
// non-empty
return false;
}
+
+ @Override
+ public void incPendingResource(String nodeLabel, Resource resourceToInc) {
+ if (nodeLabel == null) {
+ nodeLabel = RMNodeLabelsManager.NO_LABEL;
+ }
+ // ResourceUsage has its own lock, no addition lock needs here.
+ queueUsage.incPending(nodeLabel, resourceToInc);
+ if (null != parent) {
+ parent.incPendingResource(nodeLabel, resourceToInc);
+ }
+ }
+
+ @Override
+ public void decPendingResource(String nodeLabel, Resource resourceToDec) {
+ if (nodeLabel == null) {
+ nodeLabel = RMNodeLabelsManager.NO_LABEL;
+ }
+ // ResourceUsage has its own lock, no addition lock needs here.
+ queueUsage.decPending(nodeLabel, resourceToDec);
+ if (null != parent) {
+ parent.decPendingResource(nodeLabel, resourceToDec);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index fa0e280..3e5405d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -739,6 +739,15 @@ public class LeafQueue extends AbstractCSQueue {
return labels;
}
+ private boolean checkResourceRequestMatchingNodeLabel(ResourceRequest offswitchResourceRequest,
+ FiCaSchedulerNode node) {
+ String askedNodeLabel = offswitchResourceRequest.getNodeLabelExpression();
+ if (null == askedNodeLabel) {
+ askedNodeLabel = RMNodeLabelsManager.NO_LABEL;
+ }
+ return askedNodeLabel.equals(node.getPartition());
+ }
+
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
@@ -796,6 +805,14 @@ public class LeafQueue extends AbstractCSQueue {
if (application.getTotalRequiredResources(priority) <= 0) {
continue;
}
+
+ // Is the node-label-expression of this offswitch resource request
+ // matches the node's label?
+ // If not match, jump to next priority.
+ if (!checkResourceRequestMatchingNodeLabel(anyRequest, node)) {
+ continue;
+ }
+
if (!this.reservationsContinueLooking) {
if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
if (LOG.isDebugEnabled()) {
@@ -825,7 +842,7 @@ public class LeafQueue extends AbstractCSQueue {
}
// Check user limit
- if (!assignToUser(clusterResource, application.getUser(), userLimit,
+ if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
application, true, requestedNodeLabels)) {
break;
}
@@ -1076,7 +1093,7 @@ public class LeafQueue extends AbstractCSQueue {
}
@Private
- protected synchronized boolean assignToUser(Resource clusterResource,
+ protected synchronized boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
boolean checkReservations, Set<String> requestLabels) {
User user = getUser(userName);
@@ -1094,7 +1111,8 @@ public class LeafQueue extends AbstractCSQueue {
limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
- if (this.reservationsContinueLooking && checkReservations) {
+ if (this.reservationsContinueLooking && checkReservations
+ && label.equals(CommonNodeLabelsManager.NO_LABEL)) {
if (Resources.lessThanOrEqual(
resourceCalculator,
clusterResource,
@@ -1305,7 +1323,7 @@ public class LeafQueue extends AbstractCSQueue {
}
// Check user limit
- if (!assignToUser(clusterResource, application.getUser(), userLimit,
+ if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
application, false, null)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit user limit");
@@ -1622,7 +1640,8 @@ public class LeafQueue extends AbstractCSQueue {
node, rmContainer);
} else {
removed =
- application.containerCompleted(rmContainer, containerStatus, event);
+ application.containerCompleted(rmContainer, containerStatus,
+ event, node.getPartition());
node.releaseContainer(container);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 10f5c20..e041389 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -90,7 +90,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
synchronized public boolean containerCompleted(RMContainer rmContainer,
- ContainerStatus containerStatus, RMContainerEventType event) {
+ ContainerStatus containerStatus, RMContainerEventType event,
+ String partition) {
// Remove from the list of containers
if (null == liveContainers.remove(rmContainer.getContainerId())) {
@@ -122,7 +123,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Update usage metrics
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
- Resources.subtractFrom(currentConsumption, containerResource);
+ attemptResourceUsage.decUsed(partition, containerResource);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
@@ -156,7 +157,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
- Resources.addTo(currentConsumption, container.getResource());
+ attemptResourceUsage.incUsed(node.getPartition(),
+ container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
@@ -198,12 +200,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
resetReReservations(priority);
Resource resource = reservedContainer.getContainer().getResource();
- Resources.subtractFrom(currentReservation, resource);
+ this.attemptResourceUsage.decReserved(node.getPartition(), resource);
LOG.info("Application " + getApplicationId() + " unreserved "
- + " on node " + node + ", currently has " + reservedContainers.size()
- + " at priority " + priority + "; currentReservation "
- + currentReservation);
+ + " on node " + node + ", currently has "
+ + reservedContainers.size() + " at priority " + priority
+ + "; currentReservation " + this.attemptResourceUsage.getReserved()
+ + " on node-label=" + node.getPartition());
return true;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 67103d1..dfde5ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -142,7 +142,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Update usage metrics
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
- Resources.subtractFrom(currentConsumption, containerResource);
+ this.attemptResourceUsage.decUsed(containerResource);
// remove from preemption map if it is completed
preemptionMap.remove(rmContainer);
@@ -164,11 +164,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
resetReReservations(priority);
Resource resource = reservedContainer.getContainer().getResource();
- Resources.subtractFrom(currentReservation, resource);
+ this.attemptResourceUsage.decReserved(resource);
LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
- + node + ", currently has " + reservedContainers.size() + " at priority "
- + priority + "; currentReservation " + currentReservation);
+ + node + ", currently has " + reservedContainers.size()
+ + " at priority " + priority + "; currentReservation "
+ + this.attemptResourceUsage.getReserved());
}
@Override
@@ -339,7 +340,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
- Resources.addTo(currentConsumption, container.getResource());
+ this.attemptResourceUsage.incUsed(container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index 349464e..1562bf6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -291,4 +291,12 @@ public abstract class FSQueue implements Queue, Schedulable {
// TODO, add implementation for FS
return null;
}
+
+ @Override
+ public void incPendingResource(String nodeLabel, Resource resourceToInc) {
+ }
+
+ @Override
+ public void decPendingResource(String nodeLabel, Resource resourceToDec) {
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index beb3ab5..b8c419c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -200,6 +201,14 @@ public class FifoScheduler extends
// TODO add implementation for FIFO scheduler
return null;
}
+
+ @Override
+ public void incPendingResource(String nodeLabel, Resource resourceToInc) {
+ }
+
+ @Override
+ public void decPendingResource(String nodeLabel, Resource resourceToDec) {
+ }
};
public FifoScheduler() {
@@ -870,7 +879,8 @@ public class FifoScheduler extends
}
// Inform the application
- application.containerCompleted(rmContainer, containerStatus, event);
+ application.containerCompleted(rmContainer, containerStatus, event,
+ RMNodeLabelsManager.NO_LABEL);
// Inform the node
node.releaseContainer(container);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 494f5a4..f62fdb3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -164,17 +164,19 @@ public class MockAM {
public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
int containers, String labelExpression) throws Exception {
List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
- for (String host : hosts) {
- // only add host/rack request when asked host isn't ANY
- if (!host.equals(ResourceRequest.ANY)) {
- ResourceRequest hostReq =
- createResourceReq(host, memory, priority, containers,
- labelExpression);
- reqs.add(hostReq);
- ResourceRequest rackReq =
- createResourceReq("/default-rack", memory, priority, containers,
- labelExpression);
- reqs.add(rackReq);
+ if (hosts != null) {
+ for (String host : hosts) {
+ // only add host/rack request when asked host isn't ANY
+ if (!host.equals(ResourceRequest.ANY)) {
+ ResourceRequest hostReq =
+ createResourceReq(host, memory, priority, containers,
+ labelExpression);
+ reqs.add(hostReq);
+ ResourceRequest rackReq =
+ createResourceReq("/default-rack", memory, priority, containers,
+ labelExpression);
+ reqs.add(rackReq);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index e30f441..aaa615d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -29,11 +29,13 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
@@ -111,6 +113,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -128,10 +131,13 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.ComparisonFailure;
import org.junit.Test;
import org.mockito.Mockito;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
public class TestCapacityScheduler {
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
@@ -2557,6 +2563,165 @@ public class TestCapacityScheduler {
Assert.fail("Shouldn't successfully allocate containers for am2, "
+ "queue-a's max capacity will be violated if container allocated");
}
+
+ @SuppressWarnings("unchecked")
+ private <E> Set<E> toSet(E... elements) {
+ Set<E> set = Sets.newHashSet(elements);
+ return set;
+ }
+
+ @Test
+ public void testQueueHierarchyPendingResourceUpdate() throws Exception {
+ Configuration conf =
+ TestUtils.getConfigurationWithQueueLabels(new Configuration(false));
+ conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ MockRM rm = new MockRM(conf, memStore) {
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.start();
+ MockNM nm1 = // label = x
+ new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+ nm1.registerNode();
+
+ MockNM nm2 = // label = ""
+ new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
+ nm2.registerNode();
+
+ // Launch app1 in queue=a1
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+ // Launch app2 in queue=b1
+ RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+ // am1 asks for 8 * 1GB container for no label
+ am1.allocate(Arrays.asList(ResourceRequest.newInstance(
+ Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
+ null);
+
+ checkPendingResource(rm, "a1", 8 * GB, null);
+ checkPendingResource(rm, "a", 8 * GB, null);
+ checkPendingResource(rm, "root", 8 * GB, null);
+
+ // am2 asks for 8 * 1GB container for no label
+ am2.allocate(Arrays.asList(ResourceRequest.newInstance(
+ Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
+ null);
+
+ checkPendingResource(rm, "a1", 8 * GB, null);
+ checkPendingResource(rm, "a", 8 * GB, null);
+ checkPendingResource(rm, "b1", 8 * GB, null);
+ checkPendingResource(rm, "b", 8 * GB, null);
+ // root = a + b
+ checkPendingResource(rm, "root", 16 * GB, null);
+
+ // am2 asks for 8 * 1GB container in another priority for no label
+ am2.allocate(Arrays.asList(ResourceRequest.newInstance(
+ Priority.newInstance(2), "*", Resources.createResource(1 * GB), 8)),
+ null);
+
+ checkPendingResource(rm, "a1", 8 * GB, null);
+ checkPendingResource(rm, "a", 8 * GB, null);
+ checkPendingResource(rm, "b1", 16 * GB, null);
+ checkPendingResource(rm, "b", 16 * GB, null);
+ // root = a + b
+ checkPendingResource(rm, "root", 24 * GB, null);
+
+ // am1 asks 4 GB resource instead of 8 * GB for priority=1
+ am1.allocate(Arrays.asList(ResourceRequest.newInstance(
+ Priority.newInstance(1), "*", Resources.createResource(4 * GB), 1)),
+ null);
+
+ checkPendingResource(rm, "a1", 4 * GB, null);
+ checkPendingResource(rm, "a", 4 * GB, null);
+ checkPendingResource(rm, "b1", 16 * GB, null);
+ checkPendingResource(rm, "b", 16 * GB, null);
+ // root = a + b
+ checkPendingResource(rm, "root", 20 * GB, null);
+
+ // am1 asks 8 * GB resource which label=x
+ am1.allocate(Arrays.asList(ResourceRequest.newInstance(
+ Priority.newInstance(2), "*", Resources.createResource(8 * GB), 1,
+ true, "x")), null);
+
+ checkPendingResource(rm, "a1", 4 * GB, null);
+ checkPendingResource(rm, "a", 4 * GB, null);
+ checkPendingResource(rm, "a1", 8 * GB, "x");
+ checkPendingResource(rm, "a", 8 * GB, "x");
+ checkPendingResource(rm, "b1", 16 * GB, null);
+ checkPendingResource(rm, "b", 16 * GB, null);
+ // root = a + b
+ checkPendingResource(rm, "root", 20 * GB, null);
+ checkPendingResource(rm, "root", 8 * GB, "x");
+
+ // some containers allocated for am1, pending resource should decrease
+ ContainerId containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+ Assert.assertTrue(rm.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ checkPendingResource(rm, "a1", 0 * GB, null);
+ checkPendingResource(rm, "a", 0 * GB, null);
+ checkPendingResource(rm, "a1", 0 * GB, "x");
+ checkPendingResource(rm, "a", 0 * GB, "x");
+ // some containers could be allocated for am2 when we allocating containers
+ // for am1, just check if pending resource of b1/b/root > 0
+ checkPendingResourceGreaterThanZero(rm, "b1", null);
+ checkPendingResourceGreaterThanZero(rm, "b", null);
+ // root = a + b
+ checkPendingResourceGreaterThanZero(rm, "root", null);
+ checkPendingResource(rm, "root", 0 * GB, "x");
+
+ // complete am2, pending resource should be 0 now
+ AppAttemptRemovedSchedulerEvent appRemovedEvent =
+ new AppAttemptRemovedSchedulerEvent(
+ am2.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
+ rm.getResourceScheduler().handle(appRemovedEvent);
+
+ checkPendingResource(rm, "a1", 0 * GB, null);
+ checkPendingResource(rm, "a", 0 * GB, null);
+ checkPendingResource(rm, "a1", 0 * GB, "x");
+ checkPendingResource(rm, "a", 0 * GB, "x");
+ checkPendingResource(rm, "b1", 0 * GB, null);
+ checkPendingResource(rm, "b", 0 * GB, null);
+ checkPendingResource(rm, "root", 0 * GB, null);
+ checkPendingResource(rm, "root", 0 * GB, "x");
+ }
+
+ private void checkPendingResource(MockRM rm, String queueName, int memory,
+ String label) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ CSQueue queue = cs.getQueue(queueName);
+ Assert.assertEquals(
+ memory,
+ queue.getQueueResourceUsage()
+ .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+ .getMemory());
+ }
+
+ private void checkPendingResourceGreaterThanZero(MockRM rm, String queueName,
+ String label) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ CSQueue queue = cs.getQueue(queueName);
+ Assert.assertTrue(queue.getQueueResourceUsage()
+ .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+ .getMemory() > 0);
+ }
// Test verifies AM Used resource for LeafQueue when AM ResourceRequest is
// lesser than minimumAllocation
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 71dc523..23b31fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -247,7 +247,8 @@ public class TestChildQueueOrder {
// Stub an App and its containerCompleted
FiCaSchedulerApp app_0 = getMockApplication(0,user_0);
doReturn(true).when(app_0).containerCompleted(any(RMContainer.class),
- any(ContainerStatus.class),any(RMContainerEventType.class));
+ any(ContainerStatus.class), any(RMContainerEventType.class),
+ any(String.class));
Priority priority = TestUtils.createMockPriority(1);
ContainerAllocationExpirer expirer =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index fe61eab..03b8f5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -328,19 +328,6 @@ public class TestContainerAllocation {
MockRM.launchAndRegisterAM(app1, rm1, nm1);
}
- private Configuration getConfigurationWithDefaultQueueLabels(
- Configuration config) {
- final String A = CapacitySchedulerConfiguration.ROOT + ".a";
- final String B = CapacitySchedulerConfiguration.ROOT + ".b";
-
- CapacitySchedulerConfiguration conf =
- (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config);
- new CapacitySchedulerConfiguration(config);
- conf.setDefaultNodeLabelExpression(A, "x");
- conf.setDefaultNodeLabelExpression(B, "y");
- return conf;
- }
-
private Configuration getConfigurationWithQueueLabels(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
@@ -406,57 +393,6 @@ public class TestContainerAllocation {
return set;
}
- private Configuration getComplexConfigurationWithQueueLabels(
- Configuration config) {
- CapacitySchedulerConfiguration conf =
- new CapacitySchedulerConfiguration(config);
-
- // Define top-level queues
- conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
- conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
- conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
- conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
-
- final String A = CapacitySchedulerConfiguration.ROOT + ".a";
- conf.setCapacity(A, 10);
- conf.setMaximumCapacity(A, 10);
- conf.setAccessibleNodeLabels(A, toSet("x", "y"));
- conf.setCapacityByLabel(A, "x", 100);
- conf.setCapacityByLabel(A, "y", 50);
-
- final String B = CapacitySchedulerConfiguration.ROOT + ".b";
- conf.setCapacity(B, 90);
- conf.setMaximumCapacity(B, 100);
- conf.setAccessibleNodeLabels(B, toSet("y", "z"));
- conf.setCapacityByLabel(B, "y", 50);
- conf.setCapacityByLabel(B, "z", 100);
-
- // Define 2nd-level queues
- final String A1 = A + ".a1";
- conf.setQueues(A, new String[] {"a1"});
- conf.setCapacity(A1, 100);
- conf.setMaximumCapacity(A1, 100);
- conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
- conf.setDefaultNodeLabelExpression(A1, "x");
- conf.setCapacityByLabel(A1, "x", 100);
- conf.setCapacityByLabel(A1, "y", 100);
-
- conf.setQueues(B, new String[] {"b1", "b2"});
- final String B1 = B + ".b1";
- conf.setCapacity(B1, 50);
- conf.setMaximumCapacity(B1, 50);
- conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
-
- final String B2 = B + ".b2";
- conf.setCapacity(B2, 50);
- conf.setMaximumCapacity(B2, 50);
- conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
- conf.setCapacityByLabel(B2, "y", 100);
- conf.setCapacityByLabel(B2, "z", 100);
-
- return conf;
- }
-
@Test (timeout = 300000)
public void testContainerAllocationWithSingleUserLimits() throws Exception {
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
@@ -468,7 +404,7 @@ public class TestContainerAllocation {
NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
- MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
@@ -554,7 +490,7 @@ public class TestContainerAllocation {
RMNodeLabelsManager.EMPTY_STRING_SET));
// inject node label manager
- MockRM rm1 = new MockRM(getComplexConfigurationWithQueueLabels(conf)) {
+ MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
@@ -711,7 +647,7 @@ public class TestContainerAllocation {
NodeId.newInstance("h2", 0), toSet("y")));
// inject node label manager
- MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index c5b7587..e8a8243 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -1058,19 +1058,19 @@ public class TestReservations {
// set limit so subtrace reservations it can continue
Resource limit = Resources.createResource(12 * GB, 0);
- boolean res = a.assignToUser(clusterResource, user_0, limit, app_0,
+ boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
true, null);
assertTrue(res);
// tell it not to check for reservations and should fail as already over
// limit
- res = a.assignToUser(clusterResource, user_0, limit, app_0, false, null);
+ res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null);
assertFalse(res);
refreshQueuesTurnOffReservationsContLook(a, csConf);
// should now return false since feature off
- res = a.assignToUser(clusterResource, user_0, limit, app_0, true, null);
+ res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null);
assertFalse(res);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 9e352a7..62135b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.google.common.collect.Sets;
+
public class TestUtils {
private static final Log LOG = LogFactory.getLog(TestUtils.class);
@@ -216,4 +218,131 @@ public class TestUtils {
when(container.getPriority()).thenReturn(priority);
return container;
}
+
+ @SuppressWarnings("unchecked")
+ private static <E> Set<E> toSet(E... elements) {
+ Set<E> set = Sets.newHashSet(elements);
+ return set;
+ }
+
+ /**
+ * Get a queue structure:
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * | | |
+ * a1 b1 c1
+ * (x) (y)
+ * </pre>
+ */
+ public static Configuration getConfigurationWithQueueLabels(Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 15);
+ conf.setAccessibleNodeLabels(A, toSet("x"));
+ conf.setCapacityByLabel(A, "x", 100);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 20);
+ conf.setAccessibleNodeLabels(B, toSet("y"));
+ conf.setCapacityByLabel(B, "y", 100);
+
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ conf.setCapacity(C, 70);
+ conf.setMaximumCapacity(C, 70);
+ conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ conf.setQueues(A, new String[] {"a1"});
+ conf.setCapacity(A1, 100);
+ conf.setMaximumCapacity(A1, 100);
+ conf.setCapacityByLabel(A1, "x", 100);
+
+ final String B1 = B + ".b1";
+ conf.setQueues(B, new String[] {"b1"});
+ conf.setCapacity(B1, 100);
+ conf.setMaximumCapacity(B1, 100);
+ conf.setCapacityByLabel(B1, "y", 100);
+
+ final String C1 = C + ".c1";
+ conf.setQueues(C, new String[] {"c1"});
+ conf.setCapacity(C1, 100);
+ conf.setMaximumCapacity(C1, 100);
+
+ return conf;
+ }
+
+ public static Configuration getComplexConfigurationWithQueueLabels(
+ Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 10);
+ conf.setAccessibleNodeLabels(A, toSet("x", "y"));
+ conf.setCapacityByLabel(A, "x", 100);
+ conf.setCapacityByLabel(A, "y", 50);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 90);
+ conf.setMaximumCapacity(B, 100);
+ conf.setAccessibleNodeLabels(B, toSet("y", "z"));
+ conf.setCapacityByLabel(B, "y", 50);
+ conf.setCapacityByLabel(B, "z", 100);
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ conf.setQueues(A, new String[] {"a1"});
+ conf.setCapacity(A1, 100);
+ conf.setMaximumCapacity(A1, 100);
+ conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
+ conf.setDefaultNodeLabelExpression(A1, "x");
+ conf.setCapacityByLabel(A1, "x", 100);
+ conf.setCapacityByLabel(A1, "y", 100);
+
+ conf.setQueues(B, new String[] {"b1", "b2"});
+ final String B1 = B + ".b1";
+ conf.setCapacity(B1, 50);
+ conf.setMaximumCapacity(B1, 50);
+ conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
+
+ final String B2 = B + ".b2";
+ conf.setCapacity(B2, 50);
+ conf.setMaximumCapacity(B2, 50);
+ conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
+ conf.setCapacityByLabel(B2, "y", 100);
+ conf.setCapacityByLabel(B2, "z", 100);
+
+ return conf;
+ }
+
+ public static Configuration getConfigurationWithDefaultQueueLabels(
+ Configuration config) {
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+
+ CapacitySchedulerConfiguration conf =
+ (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config);
+ new CapacitySchedulerConfiguration(config);
+ conf.setDefaultNodeLabelExpression(A, "x");
+ conf.setDefaultNodeLabelExpression(B, "y");
+ return conf;
+ }
}