You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by pr...@apache.org on 2020/06/01 12:42:10 UTC

[hadoop] reference refs/for/branch-3.3 created (now b8f0e6e)

This is an automated email from the ASF dual-hosted git repository.

prabhujoseph pushed a change to reference refs/for/branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


      at b8f0e6e  YARN-10259. Fix reservation logic in Multi Node Placement.

This reference includes the following new commits:

     new b8f0e6e  YARN-10259. Fix reservation logic in Multi Node Placement.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/01: YARN-10259. Fix reservation logic in Multi Node Placement.

Posted by pr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

prabhujoseph pushed a commit to reference refs/for/branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit b8f0e6e096210d4bc8c356d755e06c237926332d
Author: Prabhu Joseph <pj...@apache.org>
AuthorDate: Thu May 7 17:47:51 2020 +0530

    YARN-10259. Fix reservation logic in Multi Node Placement.
    
    Reviewed by Wangda Tan.
    
    (cherry picked from commit 6ce295b78737aca8103912121d54f318cb5d36ef)
---
 .../scheduler/capacity/LeafQueue.java              |  14 ++-
 .../allocator/RegularContainerAllocator.java       |  18 +++-
 .../capacity/TestCapacitySchedulerMultiNodes.java  | 110 ++++++++++++++++++++-
 3 files changed, 130 insertions(+), 12 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index ebf9cf6..9f0caf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1008,11 +1008,15 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment allocateFromReservedContainer(Resource clusterResource,
       CandidateNodeSet<FiCaSchedulerNode> candidates,
       ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
-    // Considering multi-node scheduling, its better to iterate through
-    // all candidates and stop once we get atleast one good node to allocate
-    // where reservation was made earlier. In normal case, there is only one
-    // node and hence there wont be any impact after this change.
-    for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
+
+    // Irrespective of Single / Multi Node Placement, the allocate from
+    // Reserved Container has to happen only for the single node which
+    // CapacityScheduler#allocateFromReservedContainer invokes with.
+    // Else In Multi Node Placement, there won't be any Allocation or
+    // Reserve of new containers when there is a RESERVED container on
+    // a node which is full.
+    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
+    if (node != null) {
       RMContainer reservedContainer = node.getReservedContainer();
       if (reservedContainer != null) {
         FiCaSchedulerApp application = getApplication(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 1dacc96..287dc67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -837,6 +837,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     // Do checks before determining which node to allocate
     // Directly return if this check fails.
     ContainerAllocation result;
+    ContainerAllocation lastReservation = null;
 
     AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
         application.getAppSchedulingInfo().getAppPlacementAllocator(
@@ -878,11 +879,24 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       result = tryAllocateOnNode(clusterResource, node, schedulingMode,
           resourceLimits, schedulerKey, reservedContainer);
 
-      if (AllocationState.ALLOCATED == result.getAllocationState()
-          || AllocationState.RESERVED == result.getAllocationState()) {
+      if (AllocationState.ALLOCATED == result.getAllocationState()) {
         result = doAllocation(result, node, schedulerKey, reservedContainer);
         break;
       }
+
+      // In MultiNodePlacement, Try Allocate on other Available nodes
+      // from Iterator as well before Reserving. Else there won't be any
+      // Allocate of new containers when the first node in the
+      // iterator could not fit and returns RESERVED allocation.
+      if (AllocationState.RESERVED == result.getAllocationState()) {
+        lastReservation = result;
+        if (iter.hasNext()) {
+          continue;
+        } else {
+          result = doAllocation(lastReservation, node, schedulerKey,
+              reservedContainer);
+        }
+      }
     }
 
     return result;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
index fa85ca7..bb2cbfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -223,6 +224,7 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
 
     CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
     RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
     LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
     FiCaSchedulerApp schedulerApp1 =
         cs.getApplicationAttempt(am1.getApplicationAttemptId());
@@ -234,12 +236,13 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
      * after its ask has been cancelled when used capacity of root queue is 1.
      */
     // Ask a container with 6GB memory size for app1,
-    // nm1 will reserve a container for app1
+    // nm2 will reserve a container for app1
+    // Last Node from Node Iterator will be RESERVED
     am1.allocate("*", 6 * GB, 1, new ArrayList<>());
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
 
     // Check containers of app1 and app2.
-    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
     Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
     Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
     Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
@@ -324,12 +327,13 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
      * after node has sufficient resource.
      */
     // Ask a container with 6GB memory size for app2,
-    // nm1 will reserve a container for app2
+    // nm2 will reserve a container for app2
+    // Last Node from Node Iterator will be RESERVED
     am2.allocate("*", 6 * GB, 1, new ArrayList<>());
     cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
 
     // Check containers of app1 and app2.
-    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
     Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
     Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
     Assert.assertEquals(1, schedulerApp2.getReservedContainers().size());
@@ -344,4 +348,100 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
 
     rm1.close();
   }
+
+  @Test(timeout=30000)
+  public void testAllocateOfReservedContainerFromAnotherNode()
+      throws Exception {
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration(conf);
+    newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+        + ".resource-based.sorting-interval.ms", 0);
+    newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
+        1.0f);
+    MockRM rm1 = new MockRM(newConf);
+
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 12 * GB, 2);
+    MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB, 2);
+
+    // launch an app1 to queue, AM container will be launched in nm1
+    RMApp app1 = MockRMAppSubmitter.submit(rm1,
+        MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
+            .withAppName("app")
+            .withUser("user")
+            .withAcls(null)
+            .withQueue("default")
+            .build());
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // launch another app2 to queue, AM container will be launched in nm2
+    RMApp app2 = MockRMAppSubmitter.submit(rm1,
+        MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
+            .withAppName("app")
+            .withUser("user")
+            .withAcls(null)
+            .withQueue("default")
+            .build());
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // Reserve a Container for app3
+    RMApp app3 = MockRMAppSubmitter.submit(rm1,
+        MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
+            .withAppName("app")
+            .withUser("user")
+            .withAcls(null)
+            .withQueue("default")
+            .build());
+
+    final AtomicBoolean result = new AtomicBoolean(false);
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+          result.set(true);
+        } catch (Exception e) {
+          Assert.fail("Failed to allocate the reserved container");
+        }
+      }
+    };
+    t.start();
+    Thread.sleep(1000);
+
+    // Validate if app3 has got RESERVED container
+    FiCaSchedulerApp schedulerApp =
+        cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
+    Assert.assertEquals("App3 failed to get reserved container", 1,
+        schedulerApp.getReservedContainers().size());
+
+    // Free the Space on other node where Reservation has not happened
+    if (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
+      rm1.killApp(app2.getApplicationId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    } else {
+      rm1.killApp(app1.getApplicationId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // Check if Reserved AM of app3 gets allocated in
+    // node where space available
+    while (!result.get()) {
+      Thread.sleep(100);
+    }
+
+    // Validate release of reserved containers
+    schedulerApp =
+        cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
+    Assert.assertEquals("App3 failed to release Reserved container", 0,
+        schedulerApp.getReservedContainers().size());
+    Assert.assertNull(cs.getNode(rmNode1.getNodeID()).getReservedContainer());
+    Assert.assertNull(cs.getNode(rmNode2.getNodeID()).getReservedContainer());
+
+    rm1.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org