You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/09/15 00:47:51 UTC

svn commit: r1170880 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ hadoop-yarn/had...

Author: acmurthy
Date: Wed Sep 14 22:47:50 2011
New Revision: 1170880

URL: http://svn.apache.org/viewvc?rev=1170880&view=rev
Log:
Merge -r 1170878:1170879 from trunk to branch-0.23 to fix MAPREDUCE-3005.

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1170880&r1=1170879&r2=1170880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Sep 14 22:47:50 2011
@@ -1295,6 +1295,9 @@ Release 0.23.0 - Unreleased
    MAPREDUCE-2949. Fixed NodeManager to shut-down correctly if a service
    startup fails. (Ravi Teja via vinodkv)
 
+   MAPREDUCE-3005. Fix both FifoScheduler and CapacityScheduler to correctly
+   enforce locality constraints. (acmurthy) 
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1170880&r1=1170879&r2=1170880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Sep 14 22:47:50 2011
@@ -1023,21 +1023,17 @@ public class LeafQueue implements CSQueu
     // Check if we need containers on this rack 
     ResourceRequest rackLocalRequest = 
       application.getResourceRequest(priority, node.getRackName());
+    if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
+      return false;
+    }
+      
+    // If we are here, we do need containers on this rack for RACK_LOCAL req
     if (type == NodeType.RACK_LOCAL) {
-      if (rackLocalRequest == null) {
-        return false;
-      } else {
-        return rackLocalRequest.getNumContainers() > 0;      
-      }
+      return true;
     }
 
     // Check if we need containers on this host
     if (type == NodeType.NODE_LOCAL) {
-      // First: Do we need containers on this rack?
-      if (rackLocalRequest != null && rackLocalRequest.getNumContainers() == 0) {
-        return false;
-      }
-      
       // Now check if we need containers on this host...
       ResourceRequest nodeLocalRequest = 
         application.getResourceRequest(priority, node.getHostName());

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1170880&r1=1170879&r2=1170880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Wed Sep 14 22:47:50 2011
@@ -289,6 +289,7 @@ public class FifoScheduler implements Re
     return nodes.get(nodeId);
   }
   
+  @SuppressWarnings("unchecked")
   private synchronized void addApplication(ApplicationAttemptId appAttemptId,
       String queueName, String user) {
     // TODO: Fix store
@@ -440,6 +441,14 @@ public class FifoScheduler implements Re
     ResourceRequest request = 
       application.getResourceRequest(priority, node.getRMNode().getNodeAddress());
     if (request != null) {
+      // Don't allocate on this node if we don't need containers on this rack
+      ResourceRequest rackRequest =
+          application.getResourceRequest(priority, 
+              node.getRMNode().getRackName());
+      if (rackRequest == null || rackRequest.getNumContainers() <= 0) {
+        return 0;
+      }
+      
       int assignableContainers = 
         Math.min(
             getMaxAllocatableContainers(application, priority, node, 
@@ -458,6 +467,13 @@ public class FifoScheduler implements Re
     ResourceRequest request = 
       application.getResourceRequest(priority, node.getRMNode().getRackName());
     if (request != null) {
+      // Don't allocate on this rack if the application doens't need containers
+      ResourceRequest offSwitchRequest =
+          application.getResourceRequest(priority, SchedulerNode.ANY);
+      if (offSwitchRequest.getNumContainers() <= 0) {
+        return 0;
+      }
+      
       int assignableContainers = 
         Math.min(
             getMaxAllocatableContainers(application, priority, node, 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1170880&r1=1170879&r2=1170880&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Wed Sep 14 22:47:50 2011
@@ -625,7 +625,6 @@ public class TestLeafQueue {
   }
   
   
-  
   @Test
   public void testLocalityScheduling() throws Exception {
 
@@ -876,6 +875,107 @@ public class TestLeafQueue {
 
   }
   
+  @Test
+  public void testSchedulingConstraints() throws Exception {
+
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+    // User
+    String user_0 = "user_0";
+    
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    SchedulerApp app_0 = 
+        spy(new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null));
+    a.submitApplication(app_0, user_0, A);
+    
+    // Setup some nodes and racks
+    String host_0_0 = "host_0_0";
+    String rack_0 = "rack_0";
+    SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
+    String host_0_1 = "host_0_1";
+    SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
+    
+    
+    String host_1_0 = "host_1_0";
+    String rack_1 = "rack_1";
+    SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
+    
+    final int numNodes = 3;
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Setup resource-requests and submit
+    Priority priority = TestUtils.createMockPriority(1);
+    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_0_0, 1*GB, 1, 
+            priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_0_1, 1*GB, 1, 
+            priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
+            priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_1_0, 1*GB, 1, 
+            priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
+            priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+
+    // Start testing...
+    
+    // Add one request
+    app_0_requests_0.clear();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one 
+            priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+    
+    // NODE_LOCAL - node_0_1
+    a.assignContainers(clusterResource, node_0_0);
+    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
+    assertEquals(0, app_0.getTotalRequiredResources(priority));
+
+    // No allocation on node_1_0 even though it's node/rack local since
+    // required(ANY) == 0
+    a.assignContainers(clusterResource, node_1_0);
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
+                                                               // since #req=0
+    assertEquals(0, app_0.getTotalRequiredResources(priority));
+    
+    // Add one request
+    app_0_requests_0.clear();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one 
+            priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+
+    // No allocation on node_0_1 even though it's node/rack local since
+    // required(rack_1) == 0
+    a.assignContainers(clusterResource, node_0_1);
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(1, app_0.getSchedulingOpportunities(priority)); 
+    assertEquals(1, app_0.getTotalRequiredResources(priority));
+    
+    // NODE_LOCAL - node_1
+    a.assignContainers(clusterResource, node_1_0);
+    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
+    assertEquals(0, app_0.getTotalRequiredResources(priority));
+
+  }
+  
   @After
   public void tearDown() throws Exception {
   }