You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by to...@apache.org on 2013/04/25 11:18:25 UTC

svn commit: r1475682 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ hadoop-yarn/hadoop-yarn-ser...

Author: tomwhite
Date: Thu Apr 25 09:18:24 2013
New Revision: 1475682

URL: http://svn.apache.org/r1475682
Log:
Merge -r 1475680:1475681 from trunk to branch-2. Fixes:  YARN-289. Fair scheduler allows reservations that won't fit on node. Contributed by Sandy Ryza.

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1475682&r1=1475681&r2=1475682&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Apr 25 09:18:24 2013
@@ -235,6 +235,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-605. Fix failing unit test in TestNMWebServices when versionInfo has
     parantheses like when running on a git checkout. (Hitesh Shah via vinodkv)
 
+    YARN-289. Fair scheduler allows reservations that won't fit on node.
+    (Sandy Ryza via tomwhite)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1475682&r1=1475681&r2=1475682&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Thu Apr 25 09:18:24 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.util.Build
 @Private
 @Unstable
 public class AppSchedulable extends Schedulable {
+  private static final DefaultResourceCalculator RESOURCE_CALCULATOR
+    = new DefaultResourceCalculator();
+  
   private FairScheduler scheduler;
   private FSSchedulerApp app;
   private Resource demand = Resources.createResource(0);
@@ -180,15 +184,15 @@ public class AppSchedulable extends Sche
    * update relevant bookeeping. This dispatches ro relevant handlers
    * in the {@link FSSchedulerNode} and {@link SchedulerApp} classes.
    */
-  private void reserve(FSSchedulerApp application, Priority priority,
-      FSSchedulerNode node, Container container, boolean alreadyReserved) {
+  private void reserve(Priority priority, FSSchedulerNode node,
+      Container container, boolean alreadyReserved) {
     LOG.info("Making reservation: node=" + node.getHostName() +
                                  " app_id=" + app.getApplicationId());
     if (!alreadyReserved) {
-      getMetrics().reserveResource(application.getUser(), container.getResource());
-      RMContainer rmContainer = application.reserve(node, priority, null,
+      getMetrics().reserveResource(app.getUser(), container.getResource());
+      RMContainer rmContainer = app.reserve(node, priority, null,
           container);
-      node.reserveResource(application, priority, rmContainer);
+      node.reserveResource(app, priority, rmContainer);
       getMetrics().reserveResource(app.getUser(),
           container.getResource());
       scheduler.getRootQueueMetrics().reserveResource(app.getUser(),
@@ -197,25 +201,24 @@ public class AppSchedulable extends Sche
 
     else {
       RMContainer rmContainer = node.getReservedContainer();
-      application.reserve(node, priority, rmContainer, container);
-      node.reserveResource(application, priority, rmContainer);
+      app.reserve(node, priority, rmContainer, container);
+      node.reserveResource(app, priority, rmContainer);
     }
   }
 
   /**
-   * Remove the reservation on {@code node} for {@ application} at the given
+   * Remove the reservation on {@code node} at the given
    * {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode
    * handlers for an unreservation.
    */
-  private void unreserve(FSSchedulerApp application, Priority priority,
-      FSSchedulerNode node) {
+  public void unreserve(Priority priority, FSSchedulerNode node) {
     RMContainer rmContainer = node.getReservedContainer();
-    application.unreserve(node, priority);
-    node.unreserveResource(application);
+    app.unreserve(node, priority);
+    node.unreserveResource(app);
     getMetrics().unreserveResource(
-        application.getUser(), rmContainer.getContainer().getResource());
+        app.getUser(), rmContainer.getContainer().getResource());
     scheduler.getRootQueueMetrics().unreserveResource(
-        application.getUser(), rmContainer.getContainer().getResource());
+        app.getUser(), rmContainer.getContainer().getResource());
   }
 
   /**
@@ -224,8 +227,8 @@ public class AppSchedulable extends Sche
    * sure the particular request should be facilitated by this node.
    */
   private Resource assignContainer(FSSchedulerNode node,
-      FSSchedulerApp application, Priority priority,
-      ResourceRequest request, NodeType type, boolean reserved) {
+      Priority priority, ResourceRequest request, NodeType type,
+      boolean reserved) {
 
     // How much does this request need?
     Resource capability = request.getCapability();
@@ -237,7 +240,7 @@ public class AppSchedulable extends Sche
     if (reserved) {
       container = node.getReservedContainer().getContainer();
     } else {
-      container = createContainer(application, node, capability, priority);
+      container = createContainer(app, node, capability, priority);
     }
 
     // Can we allocate a container on this node?
@@ -247,9 +250,12 @@ public class AppSchedulable extends Sche
     if (availableContainers > 0) {
       // Inform the application of the new container for this request
       RMContainer allocatedContainer =
-          application.allocate(type, node, priority, request, container);
+          app.allocate(type, node, priority, request, container);
       if (allocatedContainer == null) {
         // Did the application need this resource?
+        if (reserved) {
+          unreserve(priority, node);
+        }
         return Resources.none();
       }
       else {
@@ -262,17 +268,17 @@ public class AppSchedulable extends Sche
 
       // If we had previously made a reservation, delete it
       if (reserved) {
-        unreserve(application, priority, node);
+        unreserve(priority, node);
       }
 
       // Inform the node
-      node.allocateContainer(application.getApplicationId(),
+      node.allocateContainer(app.getApplicationId(),
           allocatedContainer);
 
       return container.getResource();
     } else {
       // The desired container won't fit here, so reserve
-      reserve(application, priority, node, container, reserved);
+      reserve(priority, node, container, reserved);
 
       return FairScheduler.CONTAINER_RESERVED;
     }
@@ -287,7 +293,7 @@ public class AppSchedulable extends Sche
 
       // Make sure the application still needs requests at this priority
       if (app.getTotalRequiredResources(priority) == 0) {
-        unreserve(app, priority, node);
+        unreserve(priority, node);
         return Resources.none();
       }
     } else {
@@ -304,7 +310,8 @@ public class AppSchedulable extends Sche
     // (not scheduled) in order to promote better locality.
     synchronized (app) {
       for (Priority priority : prioritiesToTry) {
-        if (app.getTotalRequiredResources(priority) <= 0) {
+        if (app.getTotalRequiredResources(priority) <= 0 ||
+            !hasContainerForNode(priority, node)) {
           continue;
         }
         
@@ -321,14 +328,14 @@ public class AppSchedulable extends Sche
         
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
             && localRequest != null && localRequest.getNumContainers() != 0) {
-          return assignContainer(node, app, priority,
+          return assignContainer(node, priority,
               localRequest, NodeType.NODE_LOCAL, reserved);
         }
 
         if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
             && (allowedLocality.equals(NodeType.RACK_LOCAL) ||
                 allowedLocality.equals(NodeType.OFF_SWITCH))) {
-          return assignContainer(node, app, priority, rackLocalRequest,
+          return assignContainer(node, priority, rackLocalRequest,
               NodeType.RACK_LOCAL, reserved);
         }
 
@@ -336,7 +343,7 @@ public class AppSchedulable extends Sche
             ResourceRequest.ANY);
         if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
             && allowedLocality.equals(NodeType.OFF_SWITCH)) {
-          return assignContainer(node, app, priority, offSwitchRequest,
+          return assignContainer(node, priority, offSwitchRequest,
               NodeType.OFF_SWITCH, reserved);
         }
       }
@@ -352,4 +359,16 @@ public class AppSchedulable extends Sche
   public Resource assignContainer(FSSchedulerNode node) {
     return assignContainer(node, false);
   }
+  
+  /**
+   * Whether this app has containers requests that could be satisfied on the
+   * given node, if the node had full space.
+   */
+  public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
+    // TODO: add checks stuff about node specific scheduling here
+    ResourceRequest request = app.getResourceRequest(prio, ResourceRequest.ANY);
+    return request.getNumContainers() > 0 && 
+        Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
+            request.getCapability(), node.getRMNode().getTotalCapability());
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1475682&r1=1475681&r2=1475682&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Apr 25 09:18:24 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -805,14 +806,25 @@ public class FairScheduler implements Re
 
     AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
     if (reservedAppSchedulable != null) {
-      // Reservation exists; try to fulfill the reservation
-      LOG.info("Trying to fulfill reservation for application "
-          + reservedAppSchedulable.getApp().getApplicationAttemptId()
-          + " on node: " + nm);
+      Priority reservedPriority = node.getReservedContainer().getReservedPriority();
+      if (reservedAppSchedulable != null &&
+          !reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
+        // Don't hold the reservation if app can no longer use it
+        LOG.info("Releasing reservation that cannot be satisfied for application "
+            + reservedAppSchedulable.getApp().getApplicationAttemptId()
+            + " on node " + nm);
+        reservedAppSchedulable.unreserve(reservedPriority, node);
+        reservedAppSchedulable = null;
+      } else {
+        // Reservation exists; try to fulfill the reservation
+        LOG.info("Trying to fulfill reservation for application "
+            + reservedAppSchedulable.getApp().getApplicationAttemptId()
+            + " on node: " + nm);
 
-      node.getReservedAppSchedulable().assignReservedContainer(node);
+        node.getReservedAppSchedulable().assignReservedContainer(node);
+      }
     }
-    else {
+    if (reservedAppSchedulable == null) {
       // No reservation, schedule at queue which is farthest below fair share
       int assignedContainers = 0;
       while (node.getReservedContainer() == null) {

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1475682&r1=1475681&r2=1475682&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu Apr 25 09:18:24 2013
@@ -1520,4 +1520,28 @@ public class TestFairScheduler {
     }
     assertEquals(FinalApplicationStatus.FAILED, application.getFinalApplicationStatus());
   }
+  
+  @Test
+  public void testReservationThatDoesntFit() {
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+    
+    ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1",
+        "user1", 1);
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(updateEvent);
+    
+    FSSchedulerApp app = scheduler.applications.get(attId);
+    assertEquals(0, app.getLiveContainers().size());
+    assertEquals(0, app.getReservedContainers().size());
+    
+    createSchedulingRequestExistingApplication(1024, 2, attId);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+    
+    assertEquals(1, app.getLiveContainers().size());
+    assertEquals(0, app.getReservedContainers().size());
+  }
 }