You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/06/11 05:33:58 UTC

hadoop git commit: YARN-5082. Limit ContainerId increase in fair scheduler if the num of node app reserved reached the limit (sandflee via asuresh)

Repository: hadoop
Updated Branches:
  refs/heads/trunk e0f4620cc -> 5279af7cd


YARN-5082. Limit ContainerId increase in fair scheduler if the num of node app reserved reached the limit (sandflee via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5279af7c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5279af7c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5279af7c

Branch: refs/heads/trunk
Commit: 5279af7cd4afb090da742a96b5786d9dee6224bc
Parents: e0f4620
Author: Arun Suresh <as...@apache.org>
Authored: Fri Jun 10 22:33:42 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Fri Jun 10 22:33:42 2016 -0700

----------------------------------------------------------------------
 .../scheduler/fair/FSAppAttempt.java            | 62 ++++++++++-------
 .../scheduler/fair/TestFairScheduler.java       | 72 ++++++++++++++++++++
 2 files changed, 108 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5279af7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 5b83c9a..634f667 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -73,7 +73,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       = new DefaultResourceCalculator();
 
   private long startTime;
-  private Priority priority;
+  private Priority appPriority;
   private ResourceWeights resourceWeights;
   private Resource demand = Resources.createResource(0);
   private FairScheduler scheduler;
@@ -107,7 +107,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
     this.scheduler = scheduler;
     this.startTime = scheduler.getClock().getTime();
-    this.priority = Priority.newInstance(1);
+    this.appPriority = Priority.newInstance(1);
     this.resourceWeights = new ResourceWeights();
   }
 
@@ -309,7 +309,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     }
 
     // default level is NODE_LOCAL
-    if (! allowedLocalityLevel.containsKey(priority)) {
+    if (!allowedLocalityLevel.containsKey(priority)) {
       // add the initial time of priority to prevent comparing with FsApp
       // startTime and allowedLocalityLevel degrade
       lastScheduledContainer.put(priority, currentTimeMs);
@@ -353,7 +353,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
   synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
       Priority priority, ResourceRequest request,
-      Container container) {
+      Container reservedContainer) {
     // Update allowed locality level
     NodeType allowed = allowedLocalityLevel.get(priority);
     if (allowed != null) {
@@ -373,9 +373,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     if (getTotalRequiredResources(priority) <= 0) {
       return null;
     }
+
+    Container container = reservedContainer;
+    if (container == null) {
+      container =
+          createContainer(node, request.getCapability(), request.getPriority());
+    }
     
     // Create RMContainer
-    RMContainer rmContainer = new RMContainerImpl(container, 
+    RMContainer rmContainer = new RMContainerImpl(container,
         getApplicationAttemptId(), node.getNodeID(),
         appSchedulingInfo.getUser(), rmContext);
     ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
@@ -485,21 +491,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * in {@link FSSchedulerNode}..
    * return whether reservation was possible with the current threshold limits
    */
-  private boolean reserve(Priority priority, FSSchedulerNode node,
-      Container container, NodeType type, boolean alreadyReserved) {
+  private boolean reserve(ResourceRequest request, FSSchedulerNode node,
+      Container reservedContainer, NodeType type) {
 
+    Priority priority = request.getPriority();
     if (!reservationExceedsThreshold(node, type)) {
       LOG.info("Making reservation: node=" + node.getNodeName() +
               " app_id=" + getApplicationId());
-      if (!alreadyReserved) {
-        getMetrics().reserveResource(getUser(), container.getResource());
+      if (reservedContainer == null) {
+        reservedContainer =
+            createContainer(node, request.getCapability(),
+              request.getPriority());
+        getMetrics().reserveResource(getUser(),
+            reservedContainer.getResource());
         RMContainer rmContainer =
-                super.reserve(node, priority, null, container);
+                super.reserve(node, priority, null, reservedContainer);
         node.reserveResource(this, priority, rmContainer);
         setReservation(node);
       } else {
         RMContainer rmContainer = node.getReservedContainer();
-        super.reserve(node, priority, rmContainer, container);
+        super.reserve(node, priority, rmContainer, reservedContainer);
         node.reserveResource(this, priority, rmContainer);
         setReservation(node);
       }
@@ -615,18 +626,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // How much does the node have?
     Resource available = node.getUnallocatedResource();
 
-    Container container = null;
+    Container reservedContainer = null;
     if (reserved) {
-      container = node.getReservedContainer().getContainer();
-    } else {
-      container = createContainer(node, capability, request.getPriority());
+      reservedContainer = node.getReservedContainer().getContainer();
     }
 
     // Can we allocate a container on this node?
     if (Resources.fitsIn(capability, available)) {
       // Inform the application of the new container for this request
       RMContainer allocatedContainer =
-          allocate(type, node, request.getPriority(), request, container);
+          allocate(type, node, request.getPriority(), request,
+              reservedContainer);
       if (allocatedContainer == null) {
         // Did the application need this resource?
         if (reserved) {
@@ -647,30 +657,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       // the AM. Set the amResource for this app and update the leaf queue's AM
       // usage
       if (!isAmRunning() && !getUnmanagedAM()) {
-        setAMResource(container.getResource());
-        getQueue().addAMResourceUsage(container.getResource());
+        setAMResource(capability);
+        getQueue().addAMResourceUsage(capability);
         setAmRunning(true);
       }
 
-      return container.getResource();
+      return capability;
     }
 
     // The desired container won't fit here, so reserve
-    if (isReservable(container) &&
-        reserve(request.getPriority(), node, container, type, reserved)) {
+    if (isReservable(capability) &&
+        reserve(request, node, reservedContainer, type)) {
       return FairScheduler.CONTAINER_RESERVED;
     } else {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Not creating reservation as container " + container.getId()
-            + " is not reservable");
+        LOG.debug("Couldn't creating reservation for " +
+            getName() + ",at priority " +  request.getPriority());
       }
       return Resources.none();
     }
   }
 
-  private boolean isReservable(Container container) {
+  private boolean isReservable(Resource capacity) {
     return scheduler.isAtLeastReservationThreshold(
-      getQueue().getPolicy().getResourceCalculator(), container.getResource());
+        getQueue().getPolicy().getResourceCalculator(), capacity);
   }
 
   private boolean hasNodeOrRackLocalRequests(Priority priority) {
@@ -907,7 +917,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   public Priority getPriority() {
     // Right now per-app priorities are not passed to scheduler,
     // so everyone has the same priority.
-    return priority;
+    return appPriority;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5279af7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 3e5a40f..ec77a9b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -4480,4 +4480,76 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     resourceManager.getResourceScheduler().handle(nodeAddEvent1);
     return nm;
   }
+
+  @Test(timeout = 120000)
+  public void testContainerAllocationWithContainerIdLeap() throws Exception {
+    conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add two node
+    RMNode node1 = MockNodes.newNodeInfo(1,
+        Resources.createResource(3072, 10), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 = MockNodes.newNodeInfo(1,
+        Resources.createResource(3072, 10), 1, "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    ApplicationAttemptId app1 =
+        createSchedulingRequest(2048, "queue1", "user1", 2);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+    scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+
+    ApplicationAttemptId app2 =
+        createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+    scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+
+    assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+        getResourceUsage().getMemory());
+
+    //container will be reserved at node1
+    RMContainer reservedContainer1 =
+        scheduler.getSchedulerNode(node1.getNodeID()).getReservedContainer();
+    assertNotEquals(reservedContainer1, null);
+    RMContainer reservedContainer2 =
+        scheduler.getSchedulerNode(node2.getNodeID()).getReservedContainer();
+    assertEquals(reservedContainer2, null);
+
+    for (int i = 0; i < 10; i++) {
+      scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+      scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+    }
+
+    // release resource
+    scheduler.handle(new AppAttemptRemovedSchedulerEvent(
+        app1, RMAppAttemptState.KILLED, false));
+
+    assertEquals(0, scheduler.getQueueManager().getQueue("queue1").
+        getResourceUsage().getMemory());
+
+    // container will be allocated at node2
+    scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+    assertEquals(scheduler.getSchedulerApp(app2).
+        getLiveContainers().size(), 1);
+
+    long maxId = 0;
+    for (RMContainer container :
+        scheduler.getSchedulerApp(app2).getLiveContainers()) {
+      assertTrue(
+          container.getContainer().getNodeId().equals(node2.getNodeID()));
+      if (container.getContainerId().getContainerId() > maxId) {
+        maxId = container.getContainerId().getContainerId();
+      }
+    }
+
+    long reservedId = reservedContainer1.getContainerId().getContainerId();
+    assertEquals(reservedId + 1, maxId);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org