You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/09/15 00:47:51 UTC
svn commit: r1170880 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/
hadoop-yarn/had...
Author: acmurthy
Date: Wed Sep 14 22:47:50 2011
New Revision: 1170880
URL: http://svn.apache.org/viewvc?rev=1170880&view=rev
Log:
Merge -r 1170878:1170879 from trunk to branch-0.23 to fix MAPREDUCE-3005.
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1170880&r1=1170879&r2=1170880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Sep 14 22:47:50 2011
@@ -1295,6 +1295,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2949. Fixed NodeManager to shut-down correctly if a service
startup fails. (Ravi Teja via vinodkv)
+ MAPREDUCE-3005. Fix both FifoScheduler and CapacityScheduler to correctly
+ enforce locality constraints. (acmurthy)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1170880&r1=1170879&r2=1170880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Sep 14 22:47:50 2011
@@ -1023,21 +1023,17 @@ public class LeafQueue implements CSQueu
// Check if we need containers on this rack
ResourceRequest rackLocalRequest =
application.getResourceRequest(priority, node.getRackName());
+ if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
+ return false;
+ }
+
+ // If we are here, we do need containers on this rack for RACK_LOCAL req
if (type == NodeType.RACK_LOCAL) {
- if (rackLocalRequest == null) {
- return false;
- } else {
- return rackLocalRequest.getNumContainers() > 0;
- }
+ return true;
}
// Check if we need containers on this host
if (type == NodeType.NODE_LOCAL) {
- // First: Do we need containers on this rack?
- if (rackLocalRequest != null && rackLocalRequest.getNumContainers() == 0) {
- return false;
- }
-
// Now check if we need containers on this host...
ResourceRequest nodeLocalRequest =
application.getResourceRequest(priority, node.getHostName());
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1170880&r1=1170879&r2=1170880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Sep 14 22:47:50 2011
@@ -289,6 +289,7 @@ public class FifoScheduler implements Re
return nodes.get(nodeId);
}
+ @SuppressWarnings("unchecked")
private synchronized void addApplication(ApplicationAttemptId appAttemptId,
String queueName, String user) {
// TODO: Fix store
@@ -440,6 +441,14 @@ public class FifoScheduler implements Re
ResourceRequest request =
application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
if (request != null) {
+ // Don't allocate on this node if we don't need containers on this rack
+ ResourceRequest rackRequest =
+ application.getResourceRequest(priority,
+ node.getRMNode().getRackName());
+ if (rackRequest == null || rackRequest.getNumContainers() <= 0) {
+ return 0;
+ }
+
int assignableContainers =
Math.min(
getMaxAllocatableContainers(application, priority, node,
@@ -458,6 +467,13 @@ public class FifoScheduler implements Re
ResourceRequest request =
application.getResourceRequest(priority, node.getRMNode().getRackName());
if (request != null) {
+ // Don't allocate on this rack if the application doens't need containers
+ ResourceRequest offSwitchRequest =
+ application.getResourceRequest(priority, SchedulerNode.ANY);
+ if (offSwitchRequest.getNumContainers() <= 0) {
+ return 0;
+ }
+
int assignableContainers =
Math.min(
getMaxAllocatableContainers(application, priority, node,
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1170880&r1=1170879&r2=1170880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Wed Sep 14 22:47:50 2011
@@ -625,7 +625,6 @@ public class TestLeafQueue {
}
-
@Test
public void testLocalityScheduling() throws Exception {
@@ -876,6 +875,107 @@ public class TestLeafQueue {
}
+ @Test
+ public void testSchedulingConstraints() throws Exception {
+
+ // Manipulate queue 'a'
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+ // User
+ String user_0 = "user_0";
+
+ // Submit applications
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ SchedulerApp app_0 =
+ spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
+ a.submitApplication(app_0, user_0, A);
+
+ // Setup some nodes and racks
+ String host_0_0 = "host_0_0";
+ String rack_0 = "rack_0";
+ SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
+ String host_0_1 = "host_0_1";
+ SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
+
+
+ String host_1_0 = "host_1_0";
+ String rack_1 = "rack_1";
+ SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
+
+ final int numNodes = 3;
+ Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ // Setup resource-requests and submit
+ Priority priority = TestUtils.createMockPriority(1);
+ List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(host_0_0, 1*GB, 1,
+ priority, recordFactory));
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(host_0_1, 1*GB, 1,
+ priority, recordFactory));
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(rack_0, 1*GB, 1,
+ priority, recordFactory));
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(host_1_0, 1*GB, 1,
+ priority, recordFactory));
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(rack_1, 1*GB, 1,
+ priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+
+ // Start testing...
+
+ // Add one request
+ app_0_requests_0.clear();
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one
+ priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+
+ // NODE_LOCAL - node_0_1
+ a.assignContainers(clusterResource, node_0_0);
+ verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
+ assertEquals(0, app_0.getTotalRequiredResources(priority));
+
+ // No allocation on node_1_0 even though it's node/rack local since
+ // required(ANY) == 0
+ a.assignContainers(clusterResource, node_1_0);
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
+ // since #req=0
+ assertEquals(0, app_0.getTotalRequiredResources(priority));
+
+ // Add one request
+ app_0_requests_0.clear();
+ app_0_requests_0.add(
+ TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one
+ priority, recordFactory));
+ app_0.updateResourceRequests(app_0_requests_0);
+
+ // No allocation on node_0_1 even though it's node/rack local since
+ // required(rack_1) == 0
+ a.assignContainers(clusterResource, node_0_1);
+ verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ assertEquals(1, app_0.getSchedulingOpportunities(priority));
+ assertEquals(1, app_0.getTotalRequiredResources(priority));
+
+ // NODE_LOCAL - node_1
+ a.assignContainers(clusterResource, node_1_0);
+ verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
+ any(Priority.class), any(ResourceRequest.class), any(Container.class));
+ assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
+ assertEquals(0, app_0.getTotalRequiredResources(priority));
+
+ }
+
@After
public void tearDown() throws Exception {
}