You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/06/29 19:35:12 UTC

[hadoop] branch branch-3.2 updated: YARN-9903: Support reservations continue looking for Node Labels. Contributed by Jim Brennan (Jim_Brennan).

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new e6794f2  YARN-9903: Support reservations continue looking for Node Labels. Contributed by Jim Brennan (Jim_Brennan).
e6794f2 is described below

commit e6794f2fc4d763459ce13ffa8db4c064bcb076dc
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Mon Jun 29 19:21:04 2020 +0000

    YARN-9903: Support reservations continue looking for Node Labels. Contributed by Jim Brennan (Jim_Brennan).
---
 .../scheduler/capacity/AbstractCSQueue.java        |  10 +-
 .../scheduler/capacity/LeafQueue.java              |   3 +-
 .../allocator/RegularContainerAllocator.java       |  16 +-
 .../capacity/TestNodeLabelContainerAllocation.java | 272 +++++++++++++++++++++
 4 files changed, 283 insertions(+), 18 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 8603636..30e2cc8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -1006,14 +1006,12 @@ public abstract class AbstractCSQueue implements CSQueue {
       if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
           usedExceptKillable, currentLimitResource)) {
 
-        // if reservation continous looking enabled, check to see if could we
+        // if reservation continue looking enabled, check to see if could we
         // potentially use this node instead of a reserved node if the application
         // has reserved containers.
-        // TODO, now only consider reservation cases when the node has no label
-        if (this.reservationsContinueLooking && nodePartition.equals(
-            RMNodeLabelsManager.NO_LABEL) && Resources.greaterThan(
-            resourceCalculator, clusterResource, resourceCouldBeUnreserved,
-            Resources.none())) {
+        if (this.reservationsContinueLooking
+            && Resources.greaterThan(resourceCalculator, clusterResource,
+                resourceCouldBeUnreserved, Resources.none())) {
           // resource-without-reserved = used - reserved
           Resource newTotalWithoutReservedResource = Resources.subtract(
               usedExceptKillable, resourceCouldBeUnreserved);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 6757340..3bedd0d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1553,8 +1553,7 @@ public class LeafQueue extends AbstractCSQueue {
           user.getUsed(nodePartition), limit)) {
         // if enabled, check to see if could we potentially use this node instead
         // of a reserved node if the application has reserved containers
-        if (this.reservationsContinueLooking && nodePartition.equals(
-            CommonNodeLabelsManager.NO_LABEL)) {
+        if (this.reservationsContinueLooking) {
           if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
               Resources.subtract(user.getUsed(),
                   application.getCurrentReservation()), limit)) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 5bf2a0d..9096cc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -77,12 +77,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       String nodePartition) {
     // If headroom + currentReservation < required, we cannot allocate this
     // require
-    Resource resourceCouldBeUnReserved = application.getCurrentReservation();
-    if (!application.getCSLeafQueue().getReservationContinueLooking()
-        || !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      // If we don't allow reservation continuous looking, OR we're looking at
-      // non-default node partition, we won't allow to unreserve before
-      // allocation.
+    Resource resourceCouldBeUnReserved =
+        application.getAppAttemptResourceUsage().getReserved(nodePartition);
+    if (!application.getCSLeafQueue().getReservationContinueLooking()) {
+      // If we don't allow reservation continuous looking,
+      // we won't allow to unreserve before allocation.
       resourceCouldBeUnReserved = Resources.none();
     }
     return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
@@ -574,13 +573,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       // Allocate...
       // We will only do continuous reservation when this is not allocated from
       // reserved container
-      if (rmContainer == null && reservationsContinueLooking
-          && node.getLabels().isEmpty()) {
+      if (rmContainer == null && reservationsContinueLooking) {
         // when reservationsContinueLooking is set, we may need to unreserve
         // some containers to meet this queue, its parents', or the users'
         // resource limits.
-        // TODO, need change here when we want to support continuous reservation
-        // looking for labeled partitions.
         if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
           if (!needToUnreserve) {
             // If we shouldn't allocate/reserve new container then we should
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 144f11f..7b1fce7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -553,6 +553,278 @@ public class TestNodeLabelContainerAllocation {
   }
 
   @Test (timeout = 120000)
+  public void testContainerReservationContinueLookingWithLabels()
+      throws Exception {
+    // set node -> label
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+        toSet("x"), NodeId.newInstance("h2", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(
+        TestUtils.getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
+
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x)
+    RMApp app1 = rm1.submitApp(2 * GB, "app1", "user", null, "a1", "x");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
+        .getApplicationAttemptId());
+
+    // Verify live on node1
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h1");
+
+    Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
+    Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+    Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved("x").getMemorySize());
+    Assert.assertEquals(2 * GB,
+        leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB,
+        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+    // request map containers for app1.
+    am1.allocate("*", 5 * GB, 2, 5, new ArrayList<ContainerId>(), "x");
+
+    // Do node heartbeat to allocate first mapper on node1
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // Verify live on node1
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h1");
+
+    Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+    Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+    Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved("x").getMemorySize());
+    Assert.assertEquals(7 * GB,
+        leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB,
+        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+    // Do node heartbeat to allocate second mapper on node2
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // Verify live on node2
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h2");
+
+    // node1 7 GB used, node2 5 GB used
+    Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+    Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+    Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved("x").getMemorySize());
+    Assert.assertEquals(12 * GB,
+        leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB,
+        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+    // request reducer containers for app1.
+    am1.allocate("*", 3 * GB, 2, 10, new ArrayList<ContainerId>(), "x");
+
+    // Do node heartbeat to reserve reducer on node1
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // node1 7 GB used and 3 GB reserved, node2 5 GB used
+    Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
+    Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed("x").getMemorySize());
+    Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved("x").getMemorySize());
+    Assert.assertEquals(15 * GB,
+        leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+    Assert.assertEquals(3 * GB,
+        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+    // Do node heartbeat to allocate container for second reducer on node2
+    // This should unreserve the reserved container
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // Verify live on node2
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h2");
+
+    // node1 7 GB used and 0 GB reserved, node2 8 GB used
+    Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
+    Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved("x").getMemorySize());
+    Assert.assertEquals(15 * GB,
+        leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB,
+        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 120000)
+  public void testContainerReservationContinueLookingWithDefaultLabels()
+      throws Exception {
+    // This is the same as testContainerReservationContinueLookingWithLabels,
+    // but this test doesn't specify the label expression in the
+    // ResourceRequest, instead it uses default queue label expressions
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+        toSet("x"), NodeId.newInstance("h2", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(
+        TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
+
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x)
+    RMApp app1 = rm1.submitApp(2 * GB, "app1", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
+        .getApplicationAttemptId());
+
+    // Verify live on node1
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h1");
+
+    Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
+    Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+    Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved("x").getMemorySize());
+    Assert.assertEquals(2 * GB,
+        leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB,
+        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+    // request map containers for app1.
+    am1.allocate("*", 5 * GB, 2, 5, new ArrayList<ContainerId>(), null);
+
+    // Do node heartbeat to allocate first mapper on node1
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // Verify live on node1
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h1");
+
+    Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+    Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+    Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved("x").getMemorySize());
+    Assert.assertEquals(7 * GB,
+        leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB,
+        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+    // Do node heartbeat to allocate second mapper on node2
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // Verify live on node2
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h2");
+
+    // node1 7 GB used, node2 5 GB used
+    Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+    Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+    Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved("x").getMemorySize());
+    Assert.assertEquals(12 * GB,
+        leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB,
+        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+    // request reducer containers for app1.
+    am1.allocate("*", 3 * GB, 2, 10, new ArrayList<ContainerId>(), null);
+
+    // Do node heartbeat to reserve reducer on node1
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // node1 7 GB used and 3 GB reserved, node2 5 GB used
+    Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
+    Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed("x").getMemorySize());
+    Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved("x").getMemorySize());
+    Assert.assertEquals(15 * GB,
+        leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+    Assert.assertEquals(3 * GB,
+        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+    // Do node heartbeat to allocate container for second reducer on node2
+    // This should unreserve the reserved container
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // Verify live on node2
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h2");
+
+    // node1 7 GB used and 0 GB reserved, node2 8 GB used
+    Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
+    Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved("x").getMemorySize());
+    Assert.assertEquals(15 * GB,
+        leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+    Assert.assertEquals(0 * GB,
+        leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 120000)
   public void testRMContainerLeakInLeafQueue() throws Exception {
     // set node -> label
     mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org