You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2019/07/30 19:57:31 UTC
[hadoop] branch branch-3.1 updated: YARN-9596: QueueMetrics has
incorrect metrics when labelled partitions are involved. Contributed by
Muhammad Samir Khan.
This is an automated email from the ASF dual-hosted git repository.
epayne pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 36af884 YARN-9596: QueueMetrics has incorrect metrics when labelled partitions are involved. Contributed by Muhammad Samir Khan.
36af884 is described below
commit 36af8845deac64ffff3b80c1d9ee8e91499dbdd5
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Tue Jul 30 18:58:36 2019 +0000
YARN-9596: QueueMetrics has incorrect metrics when labelled partitions are involved. Contributed by Muhammad Samir Khan.
(cherry picked from commit 42683aef1a694af883c14842bf41f30b91e039f3)
---
.../scheduler/capacity/CSQueueUtils.java | 63 ++++----
.../capacity/TestNodeLabelContainerAllocation.java | 165 +++++++++++++++++++--
2 files changed, 189 insertions(+), 39 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 1c69d68..2d9c0ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -250,30 +250,24 @@ public class CSQueueUtils {
}
- private static Resource getMaxAvailableResourceToQueue(
- final ResourceCalculator rc, RMNodeLabelsManager nlm, CSQueue queue,
- Resource cluster) {
- Set<String> nodeLabels = queue.getNodeLabelsForQueue();
- Resource totalAvailableResource = Resources.createResource(0, 0);
-
- for (String partition : nodeLabels) {
- // Calculate guaranteed resource for a label in a queue by below logic.
- // (total label resource) * (absolute capacity of label in that queue)
- Resource queueGuranteedResource = queue.getEffectiveCapacity(partition);
-
- // Available resource in queue for a specific label will be calculated as
- // {(guaranteed resource for a label in a queue) -
- // (resource usage of that label in the queue)}
- // Finally accumulate this available resource to get total.
- Resource available = (Resources.greaterThan(rc, cluster,
- queueGuranteedResource,
- queue.getQueueResourceUsage().getUsed(partition))) ? Resources
- .componentwiseMax(Resources.subtractFrom(queueGuranteedResource,
- queue.getQueueResourceUsage().getUsed(partition)), Resources
- .none()) : Resources.none();
- Resources.addTo(totalAvailableResource, available);
- }
- return totalAvailableResource;
+ private static Resource getMaxAvailableResourceToQueuePartition(
+ final ResourceCalculator rc, CSQueue queue,
+ Resource cluster, String partition) {
+ // Calculate guaranteed resource for a label in a queue by below logic.
+ // (total label resource) * (absolute capacity of label in that queue)
+ Resource queueGuaranteedResource = queue.getEffectiveCapacity(partition);
+
+ // Available resource in queue for a specific label will be calculated as
+ // {(guaranteed resource for a label in a queue) -
+ // (resource usage of that label in the queue)}
+ Resource available = (Resources.greaterThan(rc, cluster,
+ queueGuaranteedResource,
+ queue.getQueueResourceUsage().getUsed(partition))) ? Resources
+ .componentwiseMax(Resources.subtractFrom(queueGuaranteedResource,
+ queue.getQueueResourceUsage().getUsed(partition)), Resources
+ .none()) : Resources.none();
+
+ return available;
}
/**
@@ -304,16 +298,27 @@ public class CSQueueUtils {
queueResourceUsage.getNodePartitionsSet())) {
updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
partition, childQueue);
+
+ // Update queue metrics w.r.t node labels.
+ // In QueueMetrics, null label is handled the same as NO_LABEL.
+ // This is because queue metrics for partitions are not tracked.
+ // In the future, will have to change this when/if queue metrics
+ // for partitions also get tracked.
+ childQueue.getMetrics().setAvailableResourcesToQueue(
+ partition,
+ getMaxAvailableResourceToQueuePartition(rc, childQueue,
+ cluster, partition));
}
} else {
updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
nodePartition, childQueue);
- }
- // Update queue metrics w.r.t node labels. In a generic way, we can
- // calculate available resource from all labels in cluster.
- childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition,
- getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster));
+ // Same as above.
+ childQueue.getMetrics().setAvailableResourcesToQueue(
+ nodePartition,
+ getMaxAvailableResourceToQueuePartition(rc, childQueue,
+ cluster, nodePartition));
+ }
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 9cfddd6..737db5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -1988,6 +1988,15 @@ public class TestNodeLabelContainerAllocation {
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a");
+ assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB());
+ assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+ LeafQueue leafQueueB = (LeafQueue) cs.getQueue("b");
+ assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
+ assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
+
// app1 -> a
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
@@ -1995,7 +2004,6 @@ public class TestNodeLabelContainerAllocation {
// app1 asks for 5 partition=x containers
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
@@ -2019,17 +2027,23 @@ public class TestNodeLabelContainerAllocation {
Assert.assertEquals(10 * GB,
reportNm2.getAvailableResource().getMemorySize());
- LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
- assertEquals(5 * GB, leafQueue.getMetrics().getAvailableMB());
- assertEquals(0 * GB, leafQueue.getMetrics().getAllocatedMB());
+ assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB());
+ assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+ assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
+ assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
+
+ // The total memory tracked by QueueMetrics is 0GB for the default partition
+ CSQueue rootQueue = cs.getRootQueue();
+ assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() +
+ rootQueue.getMetrics().getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a");
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
- assertEquals(0 * GB, leafQueue.getMetrics().getUsedAMResourceMB());
- assertEquals(0, leafQueue.getMetrics().getUsedAMResourceVCores());
+ assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
+ assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
rm1.close();
}
@@ -2120,10 +2134,9 @@ public class TestNodeLabelContainerAllocation {
reportNm2.getAvailableResource().getMemorySize());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
- double delta = 0.0001;
// 3GB is used from label x quota. 1.5 GB is remaining from default label.
// 2GB is remaining from label x.
- assertEquals(6.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+ assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB());
assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
// app1 asks for 1 default partition container
@@ -2139,10 +2152,142 @@ public class TestNodeLabelContainerAllocation {
Assert.assertEquals(2, schedulerNode2.getNumContainers());
// 3GB is used from label x quota. 2GB used from default label.
- // So total 2.5 GB is remaining.
- assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+ // So 0.5 GB is remaining from default label.
+ assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB());
assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB());
+ // The total memory tracked by QueueMetrics is 10GB
+ // for the default partition
+ CSQueue rootQueue = cs.getRootQueue();
+ assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() +
+ rootQueue.getMetrics().getAllocatedMB());
+
+ rm1.close();
+ }
+
+ @Test
+ public void testQueueMetricsWithMixedLabels() throws Exception {
+ // There is only one queue which can access both default label and label x.
+ // There are two nodes of 10GB label x and 12GB no label.
+ // The test is to make sure that the queue metrics is only tracking the
+ // allocations and availability from default partition
+
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
+ this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"a"});
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(queueA, 100);
+ csConf.setAccessibleNodeLabels(queueA, toSet("x"));
+ csConf.setCapacityByLabel(queueA, "x", 100);
+ csConf.setMaximumCapacityByLabel(queueA, "x", 100);
+
+ // set node -> label
+ // label x exclusivity is set to true
+ mgr.addToCluserNodeLabels(
+ ImmutableSet.of(NodeLabel.newInstance("x", true)));
+ mgr.addLabelsToNode(
+ ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = <no_label>
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a");
+ assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
+ assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+
+ // app1 -> a
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // app1 asks for 5 partition=x containers
+ am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
+ // NM1 do 50 heartbeats
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // app1 gets all resource in partition=x
+ Assert.assertEquals(6, schedulerNode1.getNumContainers());
+
+ SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
+ .getNodeReport(nm1.getNodeId());
+ Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+ Assert.assertEquals(4 * GB,
+ reportNm1.getAvailableResource().getMemorySize());
+
+ SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
+ .getNodeReport(nm2.getNodeId());
+ Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
+ Assert.assertEquals(12 * GB,
+ reportNm2.getAvailableResource().getMemorySize());
+
+ assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
+ assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+
+ // app2 -> a
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a", "");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // app2 asks for 5 partition= containers
+ am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "");
+ // NM2 do 50 heartbeats
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
+
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // app1 gets all resource in partition=x
+ Assert.assertEquals(6, schedulerNode2.getNumContainers());
+
+ reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId());
+ Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+ Assert.assertEquals(4 * GB,
+ reportNm1.getAvailableResource().getMemorySize());
+
+ reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId());
+ Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize());
+ Assert.assertEquals(6 * GB,
+ reportNm2.getAvailableResource().getMemorySize());
+
+ assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB());
+ assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB());
+
+ // The total memory tracked by QueueMetrics is 12GB
+ // for the default partition
+ CSQueue rootQueue = cs.getRootQueue();
+ assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() +
+ rootQueue.getMetrics().getAllocatedMB());
+
+ // Kill all apps in queue a
+ cs.killAllAppsInQueue("a");
+ rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+ rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
+
+ assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
+ assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
rm1.close();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org