You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/30 04:10:33 UTC
[kafka] branch trunk updated: KAFKA-7660: Fix child sensor memory
leak (#5974)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b7d95da KAFKA-7660: Fix child sensor memory leak (#5974)
b7d95da is described below
commit b7d95da88df3a521870fa4f6d6e53cee7e679d40
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Nov 29 22:10:24 2018 -0600
KAFKA-7660: Fix child sensor memory leak (#5974)
A heap dump provided by Patrik Kleindl in https://issues.apache.org/jira/browse/KAFKA-7660 identifies the childrenSensors map in Metrics as keeping references to sensors alive after they have been removed.
This PR fixes it and adds a test to be sure.
Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../java/org/apache/kafka/common/metrics/Metrics.java | 5 +++++
.../java/org/apache/kafka/common/metrics/Sensor.java | 10 ++++++++--
.../org/apache/kafka/common/metrics/MetricsTest.java | 16 ++++++++++++++++
3 files changed, 29 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index a6da9f9..9e2b6f1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import static java.util.Collections.emptyList;
+
/**
* A registry of sensors and metrics.
* <p>
@@ -446,6 +448,9 @@ public class Metrics implements Closeable {
removeMetric(metric.metricName());
log.debug("Removed sensor with name {}", name);
childSensors = childrenSensors.remove(sensor);
+ for (final Sensor parent : sensor.parents()) {
+ childrenSensors.getOrDefault(parent, emptyList()).remove(sensor);
+ }
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index ccbe8aa..1af9419 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -32,6 +31,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+
/**
* A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
* message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
@@ -133,6 +135,10 @@ public final class Sensor {
return this.name;
}
+ List<Sensor> parents() {
+ return unmodifiableList(asList(parents));
+ }
+
/**
* Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
*/
@@ -291,7 +297,7 @@ public final class Sensor {
}
synchronized List<KafkaMetric> metrics() {
- return Collections.unmodifiableList(new LinkedList<>(this.metrics.values()));
+ return unmodifiableList(new LinkedList<>(this.metrics.values()));
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 3184aeb..98b468a 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.metrics;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -203,6 +205,20 @@ public class MetricsTest {
}
@Test
+ public void testRemoveChildSensor() {
+ final Metrics metrics = new Metrics();
+
+ final Sensor parent = metrics.sensor("parent");
+ final Sensor child = metrics.sensor("child", parent);
+
+ assertEquals(singletonList(child), metrics.childrenSensors().get(parent));
+
+ metrics.removeSensor("child");
+
+ assertEquals(emptyList(), metrics.childrenSensors().get(parent));
+ }
+
+ @Test
public void testRemoveSensor() {
int size = metrics.metrics().size();
Sensor parent1 = metrics.sensor("test.parent1");