You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2019/07/30 19:57:31 UTC

[hadoop] branch branch-3.1 updated: YARN-9596: QueueMetrics has incorrect metrics when labelled partitions are involved. Contributed by Muhammad Samir Khan.

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 36af884  YARN-9596: QueueMetrics has incorrect metrics when labelled partitions are involved. Contributed by  Muhammad Samir Khan.
36af884 is described below

commit 36af8845deac64ffff3b80c1d9ee8e91499dbdd5
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Tue Jul 30 18:58:36 2019 +0000

    YARN-9596: QueueMetrics has incorrect metrics when labelled partitions are involved. Contributed by  Muhammad Samir Khan.
    
    (cherry picked from commit 42683aef1a694af883c14842bf41f30b91e039f3)
---
 .../scheduler/capacity/CSQueueUtils.java           |  63 ++++----
 .../capacity/TestNodeLabelContainerAllocation.java | 165 +++++++++++++++++++--
 2 files changed, 189 insertions(+), 39 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 1c69d68..2d9c0ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -250,30 +250,24 @@ public class CSQueueUtils {
 
   }
 
-  private static Resource getMaxAvailableResourceToQueue(
-      final ResourceCalculator rc, RMNodeLabelsManager nlm, CSQueue queue,
-      Resource cluster) {
-    Set<String> nodeLabels = queue.getNodeLabelsForQueue();
-    Resource totalAvailableResource = Resources.createResource(0, 0);
-
-    for (String partition : nodeLabels) {
-      // Calculate guaranteed resource for a label in a queue by below logic.
-      // (total label resource) * (absolute capacity of label in that queue)
-      Resource queueGuranteedResource = queue.getEffectiveCapacity(partition);
-
-      // Available resource in queue for a specific label will be calculated as
-      // {(guaranteed resource for a label in a queue) -
-      // (resource usage of that label in the queue)}
-      // Finally accumulate this available resource to get total.
-      Resource available = (Resources.greaterThan(rc, cluster,
-          queueGuranteedResource,
-          queue.getQueueResourceUsage().getUsed(partition))) ? Resources
-          .componentwiseMax(Resources.subtractFrom(queueGuranteedResource,
-              queue.getQueueResourceUsage().getUsed(partition)), Resources
-              .none()) : Resources.none();
-      Resources.addTo(totalAvailableResource, available);
-    }
-    return totalAvailableResource;
+  private static Resource getMaxAvailableResourceToQueuePartition(
+      final ResourceCalculator rc, CSQueue queue,
+      Resource cluster, String partition) {
+    // Calculate guaranteed resource for a label in a queue by below logic.
+    // (total label resource) * (absolute capacity of label in that queue)
+    Resource queueGuaranteedResource = queue.getEffectiveCapacity(partition);
+
+    // Available resource in queue for a specific label will be calculated as
+    // {(guaranteed resource for a label in a queue) -
+    // (resource usage of that label in the queue)}
+    Resource available = (Resources.greaterThan(rc, cluster,
+        queueGuaranteedResource,
+        queue.getQueueResourceUsage().getUsed(partition))) ? Resources
+        .componentwiseMax(Resources.subtractFrom(queueGuaranteedResource,
+            queue.getQueueResourceUsage().getUsed(partition)), Resources
+            .none()) : Resources.none();
+
+    return available;
   }
 
   /**
@@ -304,16 +298,27 @@ public class CSQueueUtils {
           queueResourceUsage.getNodePartitionsSet())) {
         updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
             partition, childQueue);
+
+        // Update queue metrics w.r.t node labels.
+        // In QueueMetrics, null label is handled the same as NO_LABEL.
+        // This is because queue metrics for partitions are not tracked.
+        // In the future, will have to change this when/if queue metrics
+        // for partitions also get tracked.
+        childQueue.getMetrics().setAvailableResourcesToQueue(
+            partition,
+            getMaxAvailableResourceToQueuePartition(rc, childQueue,
+                cluster, partition));
       }
     } else {
       updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
           nodePartition, childQueue);
-    }
 
-    // Update queue metrics w.r.t node labels. In a generic way, we can
-    // calculate available resource from all labels in cluster.
-    childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition,
-        getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster));
+      // Same as above.
+      childQueue.getMetrics().setAvailableResourcesToQueue(
+          nodePartition,
+          getMaxAvailableResourceToQueuePartition(rc, childQueue,
+              cluster, nodePartition));
+    }
    }
 
   /**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 9cfddd6..737db5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -1988,6 +1988,15 @@ public class TestNodeLabelContainerAllocation {
     rm1.start();
     MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
     MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a");
+    assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+    LeafQueue leafQueueB = (LeafQueue) cs.getQueue("b");
+    assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
+    assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
+
     // app1 -> a
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
@@ -1995,7 +2004,6 @@ public class TestNodeLabelContainerAllocation {
     // app1 asks for 5 partition=x containers
     am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
     // NM1 do 50 heartbeats
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
     RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
 
     SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
@@ -2019,17 +2027,23 @@ public class TestNodeLabelContainerAllocation {
     Assert.assertEquals(10 * GB,
         reportNm2.getAvailableResource().getMemorySize());
 
-    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
-    assertEquals(5 * GB, leafQueue.getMetrics().getAvailableMB());
-    assertEquals(0 * GB, leafQueue.getMetrics().getAllocatedMB());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+    assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
+    assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
+
+    // The total memory tracked by QueueMetrics is 0GB for the default partition
+    CSQueue rootQueue = cs.getRootQueue();
+    assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() +
+        rootQueue.getMetrics().getAllocatedMB());
 
     // Kill all apps in queue a
     cs.killAllAppsInQueue("a");
     rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
     rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
 
-    assertEquals(0 * GB, leafQueue.getMetrics().getUsedAMResourceMB());
-    assertEquals(0, leafQueue.getMetrics().getUsedAMResourceVCores());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
+    assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
     rm1.close();
   }
 
@@ -2120,10 +2134,9 @@ public class TestNodeLabelContainerAllocation {
         reportNm2.getAvailableResource().getMemorySize());
 
     LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
-    double delta = 0.0001;
     // 3GB is used from label x quota. 1.5 GB is remaining from default label.
     // 2GB is remaining from label x.
-    assertEquals(6.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB());
     assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
 
     // app1 asks for 1 default partition container
@@ -2139,10 +2152,142 @@ public class TestNodeLabelContainerAllocation {
     Assert.assertEquals(2, schedulerNode2.getNumContainers());
 
     // 3GB is used from label x quota. 2GB used from default label.
-    // So total 2.5 GB is remaining.
-    assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+    // So 0.5 GB is remaining from default label.
+    assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB());
     assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB());
 
+    // The total memory tracked by QueueMetrics is 10GB
+    // for the default partition
+    CSQueue rootQueue = cs.getRootQueue();
+    assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() +
+        rootQueue.getMetrics().getAllocatedMB());
+
+    rm1.close();
+  }
+
+  @Test
+  public void testQueueMetricsWithMixedLabels() throws Exception {
+    // There is only one queue which can access both default label and label x.
+    // There are two nodes of 10GB label x and 12GB no label.
+    // The test is to make sure that the queue metrics is only tracking the
+    // allocations and availability from default partition
+
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
+        this.conf);
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] {"a"});
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(queueA, 100);
+    csConf.setAccessibleNodeLabels(queueA, toSet("x"));
+    csConf.setCapacityByLabel(queueA, "x", 100);
+    csConf.setMaximumCapacityByLabel(queueA, "x", 100);
+
+    // set node -> label
+    // label x exclusivity is set to true
+    mgr.addToCluserNodeLabels(
+        ImmutableSet.of(NodeLabel.newInstance("x", true)));
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = <no_label>
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a");
+    assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+
+    // app1 -> a
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // app1 asks for 5 partition=x containers
+    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
+    // NM1 do 50 heartbeats
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // app1 gets all resource in partition=x
+    Assert.assertEquals(6, schedulerNode1.getNumContainers());
+
+    SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
+        .getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+    Assert.assertEquals(4 * GB,
+        reportNm1.getAvailableResource().getMemorySize());
+
+    SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
+        .getNodeReport(nm2.getNodeId());
+    Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
+    Assert.assertEquals(12 * GB,
+        reportNm2.getAvailableResource().getMemorySize());
+
+    assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+
+    // app2 -> a
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a", "");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // app2 asks for 5 partition= containers
+    am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "");
+    // NM2 do 50 heartbeats
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // app1 gets all resource in partition=x
+    Assert.assertEquals(6, schedulerNode2.getNumContainers());
+
+    reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+    Assert.assertEquals(4 * GB,
+        reportNm1.getAvailableResource().getMemorySize());
+
+    reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId());
+    Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize());
+    Assert.assertEquals(6 * GB,
+        reportNm2.getAvailableResource().getMemorySize());
+
+    assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB());
+    assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB());
+
+    // The total memory tracked by QueueMetrics is 12GB
+    // for the default partition
+    CSQueue rootQueue = cs.getRootQueue();
+    assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() +
+        rootQueue.getMetrics().getAllocatedMB());
+
+    // Kill all apps in queue a
+    cs.killAllAppsInQueue("a");
+    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+    rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
+
+    assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
+    assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
     rm1.close();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org