You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by pr...@apache.org on 2020/06/01 12:42:11 UTC
[hadoop] 01/01: YARN-10259. Fix reservation logic in Multi Node
Placement.
This is an automated email from the ASF dual-hosted git repository.
prabhujoseph pushed a commit to reference refs/for/branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit b8f0e6e096210d4bc8c356d755e06c237926332d
Author: Prabhu Joseph <pj...@apache.org>
AuthorDate: Thu May 7 17:47:51 2020 +0530
YARN-10259. Fix reservation logic in Multi Node Placement.
Reviewed by Wangda Tan.
(cherry picked from commit 6ce295b78737aca8103912121d54f318cb5d36ef)
---
.../scheduler/capacity/LeafQueue.java | 14 ++-
.../allocator/RegularContainerAllocator.java | 18 +++-
.../capacity/TestCapacitySchedulerMultiNodes.java | 110 ++++++++++++++++++++-
3 files changed, 130 insertions(+), 12 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index ebf9cf6..9f0caf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1008,11 +1008,15 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment allocateFromReservedContainer(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
- // Considering multi-node scheduling, its better to iterate through
- // all candidates and stop once we get atleast one good node to allocate
- // where reservation was made earlier. In normal case, there is only one
- // node and hence there wont be any impact after this change.
- for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
+
+ // Irrespective of Single / Multi Node Placement, the allocate from
+ // Reserved Container has to happen only for the single node which
+ // CapacityScheduler#allocateFromReservedContainer invokes with.
+ // Else In Multi Node Placement, there won't be any Allocation or
+ // Reserve of new containers when there is a RESERVED container on
+ // a node which is full.
+ FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
+ if (node != null) {
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp application = getApplication(
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 1dacc96..287dc67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -837,6 +837,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Do checks before determining which node to allocate
// Directly return if this check fails.
ContainerAllocation result;
+ ContainerAllocation lastReservation = null;
AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
application.getAppSchedulingInfo().getAppPlacementAllocator(
@@ -878,11 +879,24 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
result = tryAllocateOnNode(clusterResource, node, schedulingMode,
resourceLimits, schedulerKey, reservedContainer);
- if (AllocationState.ALLOCATED == result.getAllocationState()
- || AllocationState.RESERVED == result.getAllocationState()) {
+ if (AllocationState.ALLOCATED == result.getAllocationState()) {
result = doAllocation(result, node, schedulerKey, reservedContainer);
break;
}
+
+ // In MultiNodePlacement, Try Allocate on other Available nodes
+ // from Iterator as well before Reserving. Else there won't be any
+ // Allocate of new containers when the first node in the
+ // iterator could not fit and returns RESERVED allocation.
+ if (AllocationState.RESERVED == result.getAllocationState()) {
+ lastReservation = result;
+ if (iter.hasNext()) {
+ continue;
+ } else {
+ result = doAllocation(lastReservation, node, schedulerKey,
+ reservedContainer);
+ }
+ }
}
return result;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
index fa85ca7..bb2cbfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -223,6 +224,7 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
@@ -234,12 +236,13 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
* after its ask has been cancelled when used capacity of root queue is 1.
*/
// Ask a container with 6GB memory size for app1,
- // nm1 will reserve a container for app1
+ // nm2 will reserve a container for app1
+ // Last Node from Node Iterator will be RESERVED
am1.allocate("*", 6 * GB, 1, new ArrayList<>());
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// Check containers of app1 and app2.
- Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
@@ -324,12 +327,13 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
* after node has sufficient resource.
*/
// Ask a container with 6GB memory size for app2,
- // nm1 will reserve a container for app2
+ // nm2 will reserve a container for app2
+ // Last Node from Node Iterator will be RESERVED
am2.allocate("*", 6 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Check containers of app1 and app2.
- Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+ Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
Assert.assertEquals(1, schedulerApp2.getReservedContainers().size());
@@ -344,4 +348,100 @@ public class TestCapacitySchedulerMultiNodes extends CapacitySchedulerTestBase {
rm1.close();
}
+
+ @Test(timeout=30000)
+ public void testAllocateOfReservedContainerFromAnotherNode()
+ throws Exception {
+ CapacitySchedulerConfiguration newConf =
+ new CapacitySchedulerConfiguration(conf);
+ newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+ newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ + ".resource-based.sorting-interval.ms", 0);
+ newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
+ 1.0f);
+ MockRM rm1 = new MockRM(newConf);
+
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 12 * GB, 2);
+ MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB, 2);
+
+ // launch an app1 to queue, AM container will be launched in nm1
+ RMApp app1 = MockRMAppSubmitter.submit(rm1,
+ MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
+ .withAppName("app")
+ .withUser("user")
+ .withAcls(null)
+ .withQueue("default")
+ .build());
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // launch another app2 to queue, AM container will be launched in nm2
+ RMApp app2 = MockRMAppSubmitter.submit(rm1,
+ MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
+ .withAppName("app")
+ .withUser("user")
+ .withAcls(null)
+ .withQueue("default")
+ .build());
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // Reserve a Container for app3
+ RMApp app3 = MockRMAppSubmitter.submit(rm1,
+ MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
+ .withAppName("app")
+ .withUser("user")
+ .withAcls(null)
+ .withQueue("default")
+ .build());
+
+ final AtomicBoolean result = new AtomicBoolean(false);
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+ result.set(true);
+ } catch (Exception e) {
+ Assert.fail("Failed to allocate the reserved container");
+ }
+ }
+ };
+ t.start();
+ Thread.sleep(1000);
+
+ // Validate if app3 has got RESERVED container
+ FiCaSchedulerApp schedulerApp =
+ cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
+ Assert.assertEquals("App3 failed to get reserved container", 1,
+ schedulerApp.getReservedContainers().size());
+
+ // Free the Space on other node where Reservation has not happened
+ if (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
+ rm1.killApp(app2.getApplicationId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ } else {
+ rm1.killApp(app1.getApplicationId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // Check if Reserved AM of app3 gets allocated in
+ // node where space available
+ while (!result.get()) {
+ Thread.sleep(100);
+ }
+
+ // Validate release of reserved containers
+ schedulerApp =
+ cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
+ Assert.assertEquals("App3 failed to release Reserved container", 0,
+ schedulerApp.getReservedContainers().size());
+ Assert.assertNull(cs.getNode(rmNode1.getNodeID()).getReservedContainer());
+ Assert.assertNull(cs.getNode(rmNode2.getNodeID()).getReservedContainer());
+
+ rm1.close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org