You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bt...@apache.org on 2022/05/18 16:29:28 UTC

[hadoop] branch trunk updated: YARN-11152. QueueMetrics is leaking memory when creating a new queue during reinitialisation

This is an automated email from the ASF dual-hosted git repository.

bteke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0e6a6d18809 YARN-11152. QueueMetrics is leaking memory when creating a new queue during reinitialisation
0e6a6d18809 is described below

commit 0e6a6d18809c1958e3aaae88f0d3ce5bf380b350
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Mon May 16 10:40:46 2022 +0200

    YARN-11152. QueueMetrics is leaking memory when creating a new queue during reinitialisation
---
 .../scheduler/PartitionQueueMetrics.java           |  1 +
 .../resourcemanager/scheduler/QueueMetrics.java    | 31 ++++++++++-
 .../scheduler/capacity/AbstractCSQueue.java        |  1 +
 .../capacity/CapacitySchedulerQueueHelpers.java    | 29 +++++++++++
 .../scheduler/capacity/TestCapacityScheduler.java  | 60 ++++++++++++++++++++++
 5 files changed, 121 insertions(+), 1 deletion(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
index f43131809a0..02eaa7bd9b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
@@ -40,6 +40,7 @@ public class PartitionQueueMetrics extends QueueMetrics {
       String parentMetricName =
           partition + METRIC_NAME_DELIMITER + newQueueName;
       setParent(getQueueMetrics().get(parentMetricName));
+      storedPartitionMetrics = null;
     }
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index 6f9b1ab47b2..3e6a1d7d712 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -22,7 +22,9 @@ import static org.apache.hadoop.metrics2.lib.Interns.info;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -40,12 +42,14 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,7 +137,7 @@ public class QueueMetrics implements MetricsSource {
   protected final MetricsRegistry registry;
   protected final String queueName;
   private QueueMetrics parent;
-  private final Queue parentQueue;
+  private Queue parentQueue;
   protected final MetricsSystem metricsSystem;
   protected final Map<String, QueueMetrics> users;
   protected final Configuration conf;
@@ -177,6 +181,7 @@ public class QueueMetrics implements MetricsSource {
     "AggregatePreemptedSeconds.";
   private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC =
     "Aggregate Preempted Seconds for NAME";
+  protected Set<String> storedPartitionMetrics = Sets.newConcurrentHashSet();
 
   public QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
       boolean enableUserMetrics, Configuration conf) {
@@ -338,6 +343,7 @@ public class QueueMetrics implements MetricsSource {
           queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO,
               this.queueName));
       getQueueMetrics().put(metricName, queueMetrics);
+      registerPartitionMetricsCreation(metricName);
       return queueMetrics;
     } else {
       return metrics;
@@ -380,6 +386,7 @@ public class QueueMetrics implements MetricsSource {
                 partitionJMXStr));
       }
       getQueueMetrics().put(metricName, metrics);
+      registerPartitionMetricsCreation(metricName);
     }
     return metrics;
   }
@@ -1332,4 +1339,26 @@ public class QueueMetrics implements MetricsSource {
   public Queue getParentQueue() {
     return parentQueue;
   }
+
+  protected void registerPartitionMetricsCreation(String metricName) {
+    if (storedPartitionMetrics != null) {
+      storedPartitionMetrics.add(metricName);
+    }
+  }
+
+  public void setParentQueue(Queue parentQueue) {
+    this.parentQueue = parentQueue;
+
+    if (storedPartitionMetrics == null) {
+      return;
+    }
+
+    for (String partitionMetric : storedPartitionMetrics) {
+      QueueMetrics metric = getQueueMetrics().get(partitionMetric);
+
+      if (metric != null && metric.parentQueue != null) {
+        metric.parentQueue = parentQueue;
+      }
+    }
+  }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 124131391e2..1a5a1ce0fd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -262,6 +262,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   @Override
   public void setParent(CSQueue newParentQueue) {
     this.parent = newParentQueue;
+    getMetrics().setParentQueue(newParentQueue);
   }
 
   @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java
index 7e362731f86..1adde7d8d19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java
@@ -90,6 +90,35 @@ public final class CapacitySchedulerQueueHelpers {
     return conf;
   }
 
+  public static CapacitySchedulerConfiguration setupAdditionalQueues(
+      CapacitySchedulerConfiguration conf) {
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a", "b"});
+
+    conf.setCapacity(A, A_CAPACITY);
+    conf.setCapacity(B, B_CAPACITY);
+
+    // Define 2nd-level queues
+    conf.setQueues(A, new String[]{"a1", "a2", "a3"});
+    conf.setCapacity(A1, 30.0f);
+    conf.setUserLimitFactor(A1, 100.0f);
+    conf.setCapacity(A2, 30.0f);
+    conf.setUserLimitFactor(A2, 100.0f);
+    conf.setCapacity("root.a.a3", 40.0f);
+    conf.setUserLimitFactor("root.a.a3", 100.0f);
+
+    conf.setQueues(B, new String[]{"b1", "b2", "b3"});
+    conf.setCapacity(B1, B1_CAPACITY);
+    conf.setUserLimitFactor(B1, 100.0f);
+    conf.setCapacity(B2, B2_CAPACITY);
+    conf.setUserLimitFactor(B2, 100.0f);
+    conf.setCapacity(B3, B3_CAPACITY);
+    conf.setUserLimitFactor(B3, 100.0f);
+
+    return conf;
+  }
+
   /**
    * @param conf, to be modified
    * @return CS configuration which has deleted all children of queue(b)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 4a9e45e756f..7c407dfd4ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocVcores;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupAdditionalQueues;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupBlockedQueueConfiguration;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupOtherBlockedQueueConfiguration;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
@@ -81,6 +82,7 @@ import java.util.concurrent.CyclicBarrier;
 import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -2663,6 +2665,64 @@ public class TestCapacityScheduler {
         ContainerAllocation.QUEUE_SKIPPED.getAllocationState());
   }
 
+  /**
+   * Tests
+   * @throws Exception
+   */
+  @Test
+  public void testCSQueueMetricsDoesNotLeakOnReinit() throws Exception {
+    // Initialize resource map
+    Map<String, ResourceInformation> riMap = new HashMap<>();
+
+    // Initialize mandatory resources
+    ResourceInformation memory =
+        ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
+            ResourceInformation.MEMORY_MB.getUnits(),
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    ResourceInformation vcores =
+        ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
+            ResourceInformation.VCORES.getUnits(),
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    riMap.put(ResourceInformation.MEMORY_URI, memory);
+    riMap.put(ResourceInformation.VCORES_URI, vcores);
+
+    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    csConf.setResourceComparator(DominantResourceCalculator.class);
+
+    setupQueueConfiguration(csConf);
+
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+
+    // Don't reset resource types since we have already configured resource
+    // types
+    conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    csConf = new CapacitySchedulerConfiguration();
+    setupAdditionalQueues(csConf);
+    cs.reinitialize(csConf, cs.getRMContext());
+    QueueMetrics a3DefaultPartitionMetrics = QueueMetrics.getQueueMetrics().get(
+        "default.root.a.a3");
+
+    Assert.assertSame("Different ParentQueue of siblings is a sign of a memory leak",
+        QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(),
+        QueueMetrics.getQueueMetrics().get("root.a.a3").getParentQueue());
+
+    Assert.assertSame("Different ParentQueue of partition metrics is a sign of a memory leak",
+        QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(),
+        a3DefaultPartitionMetrics.getParentQueue());
+  }
+
   @Test
   public void testCSQueueMetrics() throws Exception {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org