You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bt...@apache.org on 2022/05/18 16:29:28 UTC
[hadoop] branch trunk updated: YARN-11152. QueueMetrics is leaking memory when creating a new queue during reinitialisation
This is an automated email from the ASF dual-hosted git repository.
bteke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0e6a6d18809 YARN-11152. QueueMetrics is leaking memory when creating a new queue during reinitialisation
0e6a6d18809 is described below
commit 0e6a6d18809c1958e3aaae88f0d3ce5bf380b350
Author: 9uapaw <gy...@gmail.com>
AuthorDate: Mon May 16 10:40:46 2022 +0200
YARN-11152. QueueMetrics is leaking memory when creating a new queue during reinitialisation
---
.../scheduler/PartitionQueueMetrics.java | 1 +
.../resourcemanager/scheduler/QueueMetrics.java | 31 ++++++++++-
.../scheduler/capacity/AbstractCSQueue.java | 1 +
.../capacity/CapacitySchedulerQueueHelpers.java | 29 +++++++++++
.../scheduler/capacity/TestCapacityScheduler.java | 60 ++++++++++++++++++++++
5 files changed, 121 insertions(+), 1 deletion(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
index f43131809a0..02eaa7bd9b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
@@ -40,6 +40,7 @@ public class PartitionQueueMetrics extends QueueMetrics {
String parentMetricName =
partition + METRIC_NAME_DELIMITER + newQueueName;
setParent(getQueueMetrics().get(parentMetricName));
+ storedPartitionMetrics = null;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index 6f9b1ab47b2..3e6a1d7d712 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -22,7 +22,9 @@ import static org.apache.hadoop.metrics2.lib.Interns.info;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -40,12 +42,14 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,7 +137,7 @@ public class QueueMetrics implements MetricsSource {
protected final MetricsRegistry registry;
protected final String queueName;
private QueueMetrics parent;
- private final Queue parentQueue;
+ private Queue parentQueue;
protected final MetricsSystem metricsSystem;
protected final Map<String, QueueMetrics> users;
protected final Configuration conf;
@@ -177,6 +181,7 @@ public class QueueMetrics implements MetricsSource {
"AggregatePreemptedSeconds.";
private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC =
"Aggregate Preempted Seconds for NAME";
+ protected Set<String> storedPartitionMetrics = Sets.newConcurrentHashSet();
public QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
@@ -338,6 +343,7 @@ public class QueueMetrics implements MetricsSource {
queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO,
this.queueName));
getQueueMetrics().put(metricName, queueMetrics);
+ registerPartitionMetricsCreation(metricName);
return queueMetrics;
} else {
return metrics;
@@ -380,6 +386,7 @@ public class QueueMetrics implements MetricsSource {
partitionJMXStr));
}
getQueueMetrics().put(metricName, metrics);
+ registerPartitionMetricsCreation(metricName);
}
return metrics;
}
@@ -1332,4 +1339,26 @@ public class QueueMetrics implements MetricsSource {
public Queue getParentQueue() {
return parentQueue;
}
+
+ protected void registerPartitionMetricsCreation(String metricName) {
+ if (storedPartitionMetrics != null) {
+ storedPartitionMetrics.add(metricName);
+ }
+ }
+
+ public void setParentQueue(Queue parentQueue) {
+ this.parentQueue = parentQueue;
+
+ if (storedPartitionMetrics == null) {
+ return;
+ }
+
+ for (String partitionMetric : storedPartitionMetrics) {
+ QueueMetrics metric = getQueueMetrics().get(partitionMetric);
+
+ if (metric != null && metric.parentQueue != null) {
+ metric.parentQueue = parentQueue;
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 124131391e2..1a5a1ce0fd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -262,6 +262,7 @@ public abstract class AbstractCSQueue implements CSQueue {
@Override
public void setParent(CSQueue newParentQueue) {
this.parent = newParentQueue;
+ getMetrics().setParentQueue(newParentQueue);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java
index 7e362731f86..1adde7d8d19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java
@@ -90,6 +90,35 @@ public final class CapacitySchedulerQueueHelpers {
return conf;
}
+ public static CapacitySchedulerConfiguration setupAdditionalQueues(
+ CapacitySchedulerConfiguration conf) {
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a", "b"});
+
+ conf.setCapacity(A, A_CAPACITY);
+ conf.setCapacity(B, B_CAPACITY);
+
+ // Define 2nd-level queues
+ conf.setQueues(A, new String[]{"a1", "a2", "a3"});
+ conf.setCapacity(A1, 30.0f);
+ conf.setUserLimitFactor(A1, 100.0f);
+ conf.setCapacity(A2, 30.0f);
+ conf.setUserLimitFactor(A2, 100.0f);
+ conf.setCapacity("root.a.a3", 40.0f);
+ conf.setUserLimitFactor("root.a.a3", 100.0f);
+
+ conf.setQueues(B, new String[]{"b1", "b2", "b3"});
+ conf.setCapacity(B1, B1_CAPACITY);
+ conf.setUserLimitFactor(B1, 100.0f);
+ conf.setCapacity(B2, B2_CAPACITY);
+ conf.setUserLimitFactor(B2, 100.0f);
+ conf.setCapacity(B3, B3_CAPACITY);
+ conf.setUserLimitFactor(B3, 100.0f);
+
+ return conf;
+ }
+
/**
* @param conf, to be modified
* @return CS configuration which has deleted all children of queue(b)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 4a9e45e756f..7c407dfd4ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocVcores;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupAdditionalQueues;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupBlockedQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupOtherBlockedQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
@@ -81,6 +82,7 @@ import java.util.concurrent.CyclicBarrier;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -2663,6 +2665,64 @@ public class TestCapacityScheduler {
ContainerAllocation.QUEUE_SKIPPED.getAllocationState());
}
+ /**
+ * Tests
+ * @throws Exception
+ */
+ @Test
+ public void testCSQueueMetricsDoesNotLeakOnReinit() throws Exception {
+ // Initialize resource map
+ Map<String, ResourceInformation> riMap = new HashMap<>();
+
+ // Initialize mandatory resources
+ ResourceInformation memory =
+ ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
+ ResourceInformation.MEMORY_MB.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ ResourceInformation vcores =
+ ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
+ ResourceInformation.VCORES.getUnits(),
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+ riMap.put(ResourceInformation.MEMORY_URI, memory);
+ riMap.put(ResourceInformation.VCORES_URI, vcores);
+
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ csConf.setResourceComparator(DominantResourceCalculator.class);
+
+ setupQueueConfiguration(csConf);
+
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+
+ // Don't reset resource types since we have already configured resource
+ // types
+ conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+
+ MockRM rm = new MockRM(conf);
+ rm.start();
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ csConf = new CapacitySchedulerConfiguration();
+ setupAdditionalQueues(csConf);
+ cs.reinitialize(csConf, cs.getRMContext());
+ QueueMetrics a3DefaultPartitionMetrics = QueueMetrics.getQueueMetrics().get(
+ "default.root.a.a3");
+
+ Assert.assertSame("Different ParentQueue of siblings is a sign of a memory leak",
+ QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(),
+ QueueMetrics.getQueueMetrics().get("root.a.a3").getParentQueue());
+
+ Assert.assertSame("Different ParentQueue of partition metrics is a sign of a memory leak",
+ QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(),
+ a3DefaultPartitionMetrics.getParentQueue());
+ }
+
@Test
public void testCSQueueMetrics() throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org