You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ac...@apache.org on 2013/06/03 20:13:54 UTC
svn commit: r1489087 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/
hadoop-yarn/hadoop-yarn-server/hado...
Author: acmurthy
Date: Mon Jun 3 18:13:53 2013
New Revision: 1489087
URL: http://svn.apache.org/r1489087
Log:
YARN-398. Make it possible to specify hard locality constraints in resource requests for CapacityScheduler. Contributed by Arun C. Murthy.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1489087&r1=1489086&r2=1489087&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Jun 3 18:13:53 2013
@@ -111,6 +111,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-326. Add multi-resource scheduling to the fair scheduler.
(sandyr via tucu)
+ YARN-398. Make it possible to specify hard locality constraints in resource
+ requests for CapacityScheduler. (acmurthy)
+
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java?rev=1489087&r1=1489086&r2=1489087&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java Mon Jun 3 18:13:53 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -31,12 +32,14 @@ public class CSAssignment {
private NodeType type;
private final RMContainer excessReservation;
private final FiCaSchedulerApp application;
+ private final boolean skipped;
public CSAssignment(Resource resource, NodeType type) {
this.resource = resource;
this.type = type;
this.application = null;
this.excessReservation = null;
+ this.skipped = false;
}
public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) {
@@ -44,8 +47,16 @@ public class CSAssignment {
this.type = NodeType.NODE_LOCAL;
this.application = application;
this.excessReservation = excessReservation;
+ this.skipped = false;
+ }
+
+ public CSAssignment(boolean skipped) {
+ this.resource = Resources.createResource(0, 0);
+ this.type = NodeType.NODE_LOCAL;
+ this.application = null;
+ this.excessReservation = null;
+ this.skipped = skipped;
}
-
public Resource getResource() {
return resource;
@@ -67,6 +78,10 @@ public class CSAssignment {
return excessReservation;
}
+ public boolean getSkipped() {
+ return skipped;
+ }
+
@Override
public String toString() {
return resource.getMemory() + ":" + type;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1489087&r1=1489086&r2=1489087&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon Jun 3 18:13:53 2013
@@ -784,6 +784,8 @@ public class LeafQueue implements CSQueu
private static final CSAssignment NULL_ASSIGNMENT =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+ private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
+
@Override
public synchronized CSAssignment
assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
@@ -853,6 +855,13 @@ public class LeafQueue implements CSQueu
assignContainersOnNode(clusterResource, node, application, priority,
null);
+ // Did the application skip this node?
+ if (assignment.getSkipped()) {
+ // Don't count 'skipped nodes' as a scheduling opportunity!
+ application.subtractSchedulingOpportunity(priority);
+ continue;
+ }
+
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
if (Resources.greaterThan(
@@ -1104,73 +1113,88 @@ public class LeafQueue implements CSQueu
Resource assigned = Resources.none();
// Data-local
- assigned =
- assignNodeLocalContainers(clusterResource, node, application, priority,
- reservedContainer);
- if (Resources.greaterThan(resourceCalculator, clusterResource,
- assigned, Resources.none())) {
- return new CSAssignment(assigned, NodeType.NODE_LOCAL);
+ ResourceRequest nodeLocalResourceRequest =
+ application.getResourceRequest(priority, node.getHostName());
+ if (nodeLocalResourceRequest != null) {
+ assigned =
+ assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
+ node, application, priority, reservedContainer);
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
+ assigned, Resources.none())) {
+ return new CSAssignment(assigned, NodeType.NODE_LOCAL);
+ }
}
// Rack-local
- assigned =
- assignRackLocalContainers(clusterResource, node, application, priority,
- reservedContainer);
- if (Resources.greaterThan(resourceCalculator, clusterResource,
- assigned, Resources.none())) {
- return new CSAssignment(assigned, NodeType.RACK_LOCAL);
+ ResourceRequest rackLocalResourceRequest =
+ application.getResourceRequest(priority, node.getRackName());
+ if (rackLocalResourceRequest != null) {
+ if (!rackLocalResourceRequest.getRelaxLocality()) {
+ return SKIP_ASSIGNMENT;
+ }
+
+ assigned =
+ assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
+ node, application, priority, reservedContainer);
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
+ assigned, Resources.none())) {
+ return new CSAssignment(assigned, NodeType.RACK_LOCAL);
+ }
}
// Off-switch
- return new CSAssignment(
- assignOffSwitchContainers(clusterResource, node, application,
- priority, reservedContainer),
- NodeType.OFF_SWITCH);
+ ResourceRequest offSwitchResourceRequest =
+ application.getResourceRequest(priority, ResourceRequest.ANY);
+ if (offSwitchResourceRequest != null) {
+ if (!offSwitchResourceRequest.getRelaxLocality()) {
+ return SKIP_ASSIGNMENT;
+ }
+
+ return new CSAssignment(
+ assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
+ node, application, priority, reservedContainer),
+ NodeType.OFF_SWITCH);
+ }
+
+ return SKIP_ASSIGNMENT;
}
- private Resource assignNodeLocalContainers(Resource clusterResource,
+ private Resource assignNodeLocalContainers(
+ Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, RMContainer reservedContainer) {
- ResourceRequest request =
- application.getResourceRequest(priority, node.getHostName());
- if (request != null) {
- if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
- reservedContainer)) {
- return assignContainer(clusterResource, node, application, priority,
- request, NodeType.NODE_LOCAL, reservedContainer);
- }
+ if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
+ reservedContainer)) {
+ return assignContainer(clusterResource, node, application, priority,
+ nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer);
}
return Resources.none();
}
- private Resource assignRackLocalContainers(Resource clusterResource,
+ private Resource assignRackLocalContainers(
+ Resource clusterResource, ResourceRequest rackLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
- ResourceRequest request =
- application.getResourceRequest(priority, node.getRackName());
- if (request != null) {
- if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
- reservedContainer)) {
- return assignContainer(clusterResource, node, application, priority, request,
- NodeType.RACK_LOCAL, reservedContainer);
- }
+ if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
+ reservedContainer)) {
+ return assignContainer(clusterResource, node, application, priority,
+ rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer);
}
+
return Resources.none();
}
- private Resource assignOffSwitchContainers(Resource clusterResource, FiCaSchedulerNode node,
- FiCaSchedulerApp application, Priority priority,
+ private Resource assignOffSwitchContainers(
+ Resource clusterResource, ResourceRequest offSwitchResourceRequest,
+ FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
- ResourceRequest request =
- application.getResourceRequest(priority, ResourceRequest.ANY);
- if (request != null) {
- if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
- reservedContainer)) {
- return assignContainer(clusterResource, node, application, priority, request,
- NodeType.OFF_SWITCH, reservedContainer);
- }
+ if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
+ reservedContainer)) {
+ return assignContainer(clusterResource, node, application, priority,
+ offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer);
}
+
return Resources.none();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1489087&r1=1489086&r2=1489087&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Mon Jun 3 18:13:53 2013
@@ -332,6 +332,11 @@ public class FiCaSchedulerApp extends Sc
schedulingOpportunities.count(priority) + 1);
}
+ synchronized public void subtractSchedulingOpportunity(Priority priority) {
+ int count = schedulingOpportunities.count(priority) - 1;
+ this.schedulingOpportunities.setCount(priority, Math.max(count, 0));
+ }
+
/**
* Return the number of times the application has been given an opportunity
* to schedule a task at the given priority since the last time it
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1489087&r1=1489086&r2=1489087&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Mon Jun 3 18:13:53 2013
@@ -512,7 +512,7 @@ public class TestApplicationLimits {
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
app_0_0_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
- priority_1, recordFactory));
+ true, priority_1, recordFactory));
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
@@ -531,7 +531,7 @@ public class TestApplicationLimits {
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
app_0_1_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
- priority_1, recordFactory));
+ true, priority_1, recordFactory));
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
@@ -550,7 +550,7 @@ public class TestApplicationLimits {
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
app_1_0_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
- priority_1, recordFactory));
+ true, priority_1, recordFactory));
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1489087&r1=1489086&r2=1489087&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Mon Jun 3 18:13:53 2013
@@ -292,8 +292,8 @@ public class TestLeafQueue {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
+ priority, recordFactory)));
// Start testing...
@@ -414,12 +414,12 @@ public class TestLeafQueue {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
+ priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+ priority, recordFactory)));
// Start testing...
@@ -547,12 +547,12 @@ public class TestLeafQueue {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
+ priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+ priority, recordFactory)));
/**
* Start testing...
@@ -640,12 +640,12 @@ public class TestLeafQueue {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
+ priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+ priority, recordFactory)));
/**
* Start testing...
@@ -679,8 +679,8 @@ public class TestLeafQueue {
// Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f);
app_2.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
+ priority, recordFactory)));
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
// No more to user_0 since he is already over user-limit
@@ -696,8 +696,8 @@ public class TestLeafQueue {
// Check headroom for app_2
LOG.info("here");
app_1.updateResourceRequests(Collections.singletonList( // unset
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
+ priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
@@ -757,12 +757,12 @@ public class TestLeafQueue {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
+ priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
+ priority, recordFactory)));
/**
* Start testing...
@@ -791,12 +791,12 @@ public class TestLeafQueue {
// Submit resource requests for other apps now to 'activate' them
app_2.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, true,
+ priority, recordFactory)));
app_3.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+ priority, recordFactory)));
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
@@ -919,12 +919,12 @@ public class TestLeafQueue {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+ priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
+ priority, recordFactory)));
// Start testing...
@@ -1021,19 +1021,19 @@ public class TestLeafQueue {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
+ priority, recordFactory)));
// Setup app_1 to request a 4GB container on host_0 and
// another 4GB container anywhere.
ArrayList<ResourceRequest> appRequests_1 =
new ArrayList<ResourceRequest>(4);
appRequests_1.add(TestUtils.createResourceRequest(host_0, 4*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 2,
- priority, recordFactory));
+ true, priority, recordFactory));
app_1.updateResourceRequests(appRequests_1);
// Start testing...
@@ -1127,12 +1127,12 @@ public class TestLeafQueue {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+ priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
- TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, priority,
- recordFactory)));
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
+ priority, recordFactory)));
// Start testing...
@@ -1242,19 +1242,19 @@ public class TestLeafQueue {
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
- priority, recordFactory));
+ true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
@@ -1313,13 +1313,13 @@ public class TestLeafQueue {
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra
- priority, recordFactory));
+ true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
assertEquals(2, app_0.getTotalRequiredResources(priority));
@@ -1385,31 +1385,31 @@ public class TestLeafQueue {
Priority priority_1 = TestUtils.createMockPriority(1);
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0, 1*GB, 1,
- priority_1, recordFactory));
+ true, priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
- priority_1, recordFactory));
+ true, priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1, 1*GB, 1,
- priority_1, recordFactory));
+ true, priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
- priority_1, recordFactory));
+ true, priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
- priority_1, recordFactory));
+ true, priority_1, recordFactory));
// P2
Priority priority_2 = TestUtils.createMockPriority(2);
app_0_requests_0.add(
TestUtils.createResourceRequest(host_2, 2*GB, 1,
- priority_2, recordFactory));
+ true, priority_2, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_2, 2*GB, 1,
- priority_2, recordFactory));
+ true, priority_2, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
- priority_2, recordFactory));
+ true, priority_2, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
@@ -1513,19 +1513,19 @@ public class TestLeafQueue {
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0_0, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_0_1, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_0, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(host_1_0, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
- priority, recordFactory));
+ true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
@@ -1534,7 +1534,7 @@ public class TestLeafQueue {
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
- priority, recordFactory));
+ true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
@@ -1557,7 +1557,7 @@ public class TestLeafQueue {
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
- priority, recordFactory));
+ true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
// No allocation on node_0_1 even though it's node/rack local since
@@ -1731,7 +1731,174 @@ public class TestLeafQueue {
c1.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
}
-
+
+ @Test
+ public void testLocalityConstraints() throws Exception {
+
+ // Manipulate queue 'a'
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+ // User
+ String user_0 = "user_0";
+
+ // Submit applications
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app_0 =
+ spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ mock(ActiveUsersManager.class), rmContext));
+ a.submitApplication(app_0, user_0, A);
+
+ final ApplicationAttemptId appAttemptId_1 =
+ TestUtils.getMockApplicationAttemptId(1, 0);
+ FiCaSchedulerApp app_1 =
+ spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+ mock(ActiveUsersManager.class), rmContext));
+ a.submitApplication(app_1, user_0, A);
+
+ // Setup some nodes and racks
+ String host_0_0 = "127.0.0.1";
+ String rack_0 = "rack_0";
+ FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
+ String host_0_1 = "127.0.0.2";
+ FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
+
+
+ String host_1_0 = "127.0.0.3";
+ String rack_1 = "rack_1";
+ FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
+ String host_1_1 = "127.0.0.4";
+ FiCaSchedulerNode node_1_1 = TestUtils.getMockNode(host_1_1, rack_1, 0, 8*GB);
+
+ final int numNodes = 4;
+ Resource clusterResource = Resources.createResource(
+ numNodes * (8*GB), numNodes * 1);
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ // Setup resource-requests
+ // resourceName: <priority, memory, #containers, relaxLocality>
+ // host_0_0: < 1, 1GB, 1, true >
+ // host_0_1: < null >
+ // rack_0: < null > <----
+ // host_1_0: < 1, 1GB, 1, true >
+ // host_1_1: < null >
+ // rack_1: < 1, 1GB, 1, false > <----
+ // ANY: < 1, 1GB, 1, false > <----
+ // Availability:
+ // host_0_0: 8G
+ // host_0_1: 8G
+ // host_1_0: 8G
+ // host_1_1: 8G
+ Priority priority = TestUtils.createMockPriority(1);
+ List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(host_0_0, 1*GB, 1,
+ true, priority, recordFactory));
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(host_1_0, 1*GB, 1,
+ true, priority, recordFactory));
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(rack_1, 1*GB, 1,
+ false, priority, recordFactory));
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
+ false, priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+ app_0_requests_0.clear();
+
+ //
+ // Start testing...
+ //
+
+ // node_0_1
+ // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
+ a.assignContainers(clusterResource, node_0_1);
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
+
+ // resourceName: <priority, memory, #containers, relaxLocality>
+ // host_0_0: < 1, 1GB, 1, true >
+ // host_0_1: < null >
+ // rack_0: < null > <----
+ // host_1_0: < 1, 1GB, 1, true >
+ // host_1_1: < null >
+ // rack_1: < 1, 1GB, 1, false > <----
+ // ANY: < 1, 1GB, 1, false > <----
+ // Availability:
+ // host_0_0: 8G
+ // host_0_1: 8G
+ // host_1_0: 8G
+ // host_1_1: 8G
+
+ // node_1_1
+ // Shouldn't allocate since RR(rack_1) = relax: false
+ a.assignContainers(clusterResource, node_1_1);
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
+
+ // Allow rack-locality for rack_1
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(rack_1, 1*GB, 1,
+ true, priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+ app_0_requests_0.clear();
+
+ // resourceName: <priority, memory, #containers, relaxLocality>
+ // host_0_0: < 1, 1GB, 1, true >
+ // host_0_1: < null >
+ // rack_0: < null >
+ // host_1_0: < 1, 1GB, 1, true >
+ // host_1_1: < null >
+ // rack_1: < 1, 1GB, 1, true > <----
+ // ANY: < 1, 1GB, 1, false >
+ // Availability:
+ // host_0_0: 8G
+ // host_0_1: 8G
+ // host_1_0: 8G
+ // host_1_1: 8G
+
+ // node_1_1
+ // Now, should allocate since RR(rack_1) = relax: true
+ a.assignContainers(clusterResource, node_1_1);
+ verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ assertEquals(0, app_0.getSchedulingOpportunities(priority));
+ assertEquals(0, app_0.getTotalRequiredResources(priority));
+
+ // Now sanity-check node_local
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(rack_1, 1*GB, 1,
+ false, priority, recordFactory));
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
+ false, priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+ app_0_requests_0.clear();
+
+ // resourceName: <priority, memory, #containers, relaxLocality>
+ // host_0_0: < 1, 1GB, 1, true >
+ // host_0_1: < null >
+ // rack_0: < null >
+ // host_1_0: < 1, 1GB, 1, true >
+ // host_1_1: < null >
+ // rack_1: < 1, 1GB, 1, false > <----
+ // ANY: < 1, 1GB, 1, false > <----
+ // Availability:
+ // host_0_0: 8G
+ // host_0_1: 8G
+ // host_1_0: 8G
+ // host_1_1: 7G
+
+ a.assignContainers(clusterResource, node_1_0);
+ verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ assertEquals(0, app_0.getSchedulingOpportunities(priority));
+ assertEquals(0, app_0.getTotalRequiredResources(priority));
+
+ }
+
@After
public void tearDown() throws Exception {
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java?rev=1489087&r1=1489086&r2=1489087&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java Mon Jun 3 18:13:53 2013
@@ -115,15 +115,16 @@ public class TestUtils {
}
public static ResourceRequest createResourceRequest(
- String hostName, int memory, int numContainers, Priority priority,
- RecordFactory recordFactory) {
+ String resourceName, int memory, int numContainers, boolean relaxLocality,
+ Priority priority, RecordFactory recordFactory) {
ResourceRequest request =
recordFactory.newRecordInstance(ResourceRequest.class);
Resource capability = Resources.createResource(memory, 1);
request.setNumContainers(numContainers);
- request.setResourceName(hostName);
+ request.setResourceName(resourceName);
request.setCapability(capability);
+ request.setRelaxLocality(relaxLocality);
request.setPriority(priority);
return request;
}