You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/06/29 18:56:55 UTC
[hadoop] branch trunk updated: YARN-9903: Support reservations
continue looking for Node Labels. Contributed by Jim Brennan (Jim_Brennan).
This is an automated email from the ASF dual-hosted git repository.
epayne pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 74fc13c YARN-9903: Support reservations continue looking for Node Labels. Contributed by Jim Brennan (Jim_Brennan).
74fc13c is described below
commit 74fc13cf91818a70f434401244f7560c4db3a676
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Mon Jun 29 18:39:53 2020 +0000
YARN-9903: Support reservations continue looking for Node Labels. Contributed by Jim Brennan (Jim_Brennan).
---
.../scheduler/capacity/AbstractCSQueue.java | 10 +-
.../scheduler/capacity/LeafQueue.java | 3 +-
.../allocator/RegularContainerAllocator.java | 16 +-
.../capacity/TestNodeLabelContainerAllocation.java | 289 +++++++++++++++++++++
4 files changed, 300 insertions(+), 18 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 968d971..f1467a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -1076,14 +1076,12 @@ public abstract class AbstractCSQueue implements CSQueue {
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
usedExceptKillable, currentLimitResource)) {
- // if reservation continous looking enabled, check to see if could we
+ // if reservation continue looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application
// has reserved containers.
- // TODO, now only consider reservation cases when the node has no label
- if (this.reservationsContinueLooking && nodePartition.equals(
- RMNodeLabelsManager.NO_LABEL) && Resources.greaterThan(
- resourceCalculator, clusterResource, resourceCouldBeUnreserved,
- Resources.none())) {
+ if (this.reservationsContinueLooking
+ && Resources.greaterThan(resourceCalculator, clusterResource,
+ resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource = Resources.subtract(
usedExceptKillable, resourceCouldBeUnreserved);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 4d83538..05150a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1574,8 +1574,7 @@ public class LeafQueue extends AbstractCSQueue {
user.getUsed(nodePartition), limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
- if (this.reservationsContinueLooking && nodePartition.equals(
- CommonNodeLabelsManager.NO_LABEL)) {
+ if (this.reservationsContinueLooking) {
if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
Resources.subtract(user.getUsed(),
application.getCurrentReservation()), limit)) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 287dc67..cced238 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -79,12 +79,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
String nodePartition) {
// If headroom + currentReservation < required, we cannot allocate this
// require
- Resource resourceCouldBeUnReserved = application.getCurrentReservation();
- if (!application.getCSLeafQueue().getReservationContinueLooking()
- || !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
- // If we don't allow reservation continuous looking, OR we're looking at
- // non-default node partition, we won't allow to unreserve before
- // allocation.
+ Resource resourceCouldBeUnReserved =
+ application.getAppAttemptResourceUsage().getReserved(nodePartition);
+ if (!application.getCSLeafQueue().getReservationContinueLooking()) {
+ // If we don't allow reservation continuous looking,
+ // we won't allow to unreserve before allocation.
resourceCouldBeUnReserved = Resources.none();
}
return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
@@ -583,13 +582,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Allocate...
// We will only do continuous reservation when this is not allocated from
// reserved container
- if (rmContainer == null && reservationsContinueLooking
- && node.getLabels().isEmpty()) {
+ if (rmContainer == null && reservationsContinueLooking) {
// when reservationsContinueLooking is set, we may need to unreserve
// some containers to meet this queue, its parents', or the users'
// resource limits.
- // TODO, need change here when we want to support continuous reservation
- // looking for labeled partitions.
if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
if (!needToUnreserve) {
// If we shouldn't allocate/reserve new container then we should
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 55f98d2..4ac57dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -643,6 +643,295 @@ public class TestNodeLabelContainerAllocation {
}
@Test (timeout = 120000)
+ public void testContainerReservationContinueLookingWithLabels()
+ throws Exception {
+ // set node -> label
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+ toSet("x"), NodeId.newInstance("h2", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(
+ TestUtils.getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+ LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x)
+ MockRMAppSubmissionData data1 =
+ MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
+ .withAppName("app1")
+ .withUser("user")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .withAmLabel("x")
+ .build();
+ RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
+ .getApplicationAttemptId());
+
+ // Verify live on node1
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+ Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getReserved("x").getMemorySize());
+ Assert.assertEquals(2 * GB,
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB,
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+ // request map containers for app1.
+ am1.allocate("*", 5 * GB, 2, 5, new ArrayList<ContainerId>(), "x");
+
+ // Do node heartbeat to allocate first mapper on node1
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ // Verify live on node1
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+ Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getReserved("x").getMemorySize());
+ Assert.assertEquals(7 * GB,
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB,
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+ // Do node heartbeat to allocate second mapper on node2
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // Verify live on node2
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // node1 7 GB used, node2 5 GB used
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+ Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getReserved("x").getMemorySize());
+ Assert.assertEquals(12 * GB,
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB,
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+ // request reducer containers for app1.
+ am1.allocate("*", 3 * GB, 2, 10, new ArrayList<ContainerId>(), "x");
+
+ // Do node heartbeat to reserve reducer on node1
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ // node1 7 GB used and 3 GB reserved, node2 5 GB used
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
+ Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getUsed("x").getMemorySize());
+ Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getReserved("x").getMemorySize());
+ Assert.assertEquals(15 * GB,
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+ Assert.assertEquals(3 * GB,
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+ // Do node heartbeat to allocate container for second reducer on node2
+ // This should unreserve the reserved container
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // Verify live on node2
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // node1 7 GB used and 0 GB reserved, node2 8 GB used
+ Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
+ Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getReserved("x").getMemorySize());
+ Assert.assertEquals(15 * GB,
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB,
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 120000)
+ public void testContainerReservationContinueLookingWithDefaultLabels()
+ throws Exception {
+ // This is the same as testContainerReservationContinueLookingWithLabels,
+ // but this test doesn't specify the label expression in the
+ // ResourceRequest, instead it uses default queue label expressions
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+ toSet("x"), NodeId.newInstance("h2", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(
+ TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+ LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x)
+ MockRMAppSubmissionData data1 =
+ MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
+ .withAppName("app1")
+ .withUser("user")
+ .withAcls(null)
+ .withQueue("a1")
+ .withUnmanagedAM(false)
+ .build();
+ RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
+ .getApplicationAttemptId());
+
+ // Verify live on node1
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+ Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getReserved("x").getMemorySize());
+ Assert.assertEquals(2 * GB,
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB,
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+ // request map containers for app1.
+ am1.allocate("*", 5 * GB, 2, 5, new ArrayList<ContainerId>(), null);
+
+ // Do node heartbeat to allocate first mapper on node1
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ // Verify live on node1
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+ Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getReserved("x").getMemorySize());
+ Assert.assertEquals(7 * GB,
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB,
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+ // Do node heartbeat to allocate second mapper on node2
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // Verify live on node2
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // node1 7 GB used, node2 5 GB used
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
+ Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getReserved("x").getMemorySize());
+ Assert.assertEquals(12 * GB,
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB,
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+ // request reducer containers for app1.
+ am1.allocate("*", 3 * GB, 2, 10, new ArrayList<ContainerId>(), null);
+
+ // Do node heartbeat to reserve reducer on node1
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ // node1 7 GB used and 3 GB reserved, node2 5 GB used
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
+ Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getUsed("x").getMemorySize());
+ Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getReserved("x").getMemorySize());
+ Assert.assertEquals(15 * GB,
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+ Assert.assertEquals(3 * GB,
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+ // Do node heartbeat to allocate container for second reducer on node2
+ // This should unreserve the reserved container
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // Verify live on node2
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // node1 7 GB used and 0 GB reserved, node2 8 GB used
+ Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
+ Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+ .getReserved("x").getMemorySize());
+ Assert.assertEquals(15 * GB,
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
+ Assert.assertEquals(0 * GB,
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 120000)
public void testRMContainerLeakInLeafQueue() throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org