You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/04 21:51:27 UTC
[kafka] branch 2.0 updated: KAFKA-7660: fix streams and Metrics
memory leaks (#5980)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 985287a KAFKA-7660: fix streams and Metrics memory leaks (#5980)
985287a is described below
commit 985287a274690b25d26e822a593e1665c504e277
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Dec 4 15:51:18 2018 -0600
KAFKA-7660: fix streams and Metrics memory leaks (#5980)
---
.../org/apache/kafka/common/metrics/Metrics.java | 5 ++
.../org/apache/kafka/common/metrics/Sensor.java | 10 +++-
.../apache/kafka/common/metrics/MetricsTest.java | 16 ++++++
.../internals/metrics/StreamsMetricsImpl.java | 24 +++++----
.../{ => metrics}/StreamsMetricsImplTest.java | 61 ++++++++++++++++++++--
5 files changed, 100 insertions(+), 16 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index a6da9f9..9e2b6f1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import static java.util.Collections.emptyList;
+
/**
* A registry of sensors and metrics.
* <p>
@@ -446,6 +448,9 @@ public class Metrics implements Closeable {
removeMetric(metric.metricName());
log.debug("Removed sensor with name {}", name);
childSensors = childrenSensors.remove(sensor);
+ for (final Sensor parent : sensor.parents()) {
+ childrenSensors.getOrDefault(parent, emptyList()).remove(sensor);
+ }
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index ccbe8aa..1af9419 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -32,6 +31,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+
/**
* A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
* message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
@@ -133,6 +135,10 @@ public final class Sensor {
return this.name;
}
+ List<Sensor> parents() {
+ return unmodifiableList(asList(parents));
+ }
+
/**
* Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
*/
@@ -291,7 +297,7 @@ public final class Sensor {
}
synchronized List<KafkaMetric> metrics() {
- return Collections.unmodifiableList(new LinkedList<>(this.metrics.values()));
+ return unmodifiableList(new LinkedList<>(this.metrics.values()));
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 5c75d03..6ad4833 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.metrics;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -198,6 +200,20 @@ public class MetricsTest {
}
@Test
+ public void testRemoveChildSensor() {
+ final Metrics metrics = new Metrics();
+
+ final Sensor parent = metrics.sensor("parent");
+ final Sensor child = metrics.sensor("child", parent);
+
+ assertEquals(singletonList(child), metrics.childrenSensors().get(parent));
+
+ metrics.removeSensor("child");
+
+ assertEquals(emptyList(), metrics.childrenSensors().get(parent));
+ }
+
+ @Test
public void testRemoveSensor() {
int size = metrics.metrics().size();
Sensor parent1 = metrics.sensor("test.parent1");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 51665e6..ba0b58f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -110,11 +110,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public final void removeAllTaskLevelSensors(final String taskName) {
final String key = threadName + "." + taskName;
synchronized (taskLevelSensors) {
- if (taskLevelSensors.containsKey(key)) {
- while (!taskLevelSensors.get(key).isEmpty()) {
- metrics.removeSensor(taskLevelSensors.get(key).pop());
- }
- taskLevelSensors.remove(key);
+ final Deque<String> sensors = taskLevelSensors.remove(key);
+ while (sensors != null && !sensors.isEmpty()) {
+ metrics.removeSensor(sensors.pop());
}
}
}
@@ -143,11 +141,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) {
final String key = threadName + "." + taskName + "." + cacheName;
synchronized (cacheLevelSensors) {
- if (cacheLevelSensors.containsKey(key)) {
- while (!cacheLevelSensors.get(key).isEmpty()) {
- metrics.removeSensor(cacheLevelSensors.get(key).pop());
- }
- cacheLevelSensors.remove(key);
+ final Deque<String> strings = cacheLevelSensors.remove(key);
+ while (strings != null && !strings.isEmpty()) {
+ metrics.removeSensor(strings.pop());
}
}
}
@@ -361,10 +357,16 @@ public class StreamsMetricsImpl implements StreamsMetrics {
Objects.requireNonNull(sensor, "Sensor is null");
metrics.removeSensor(sensor.name());
- final Sensor parent = parentSensors.get(sensor);
+ final Sensor parent = parentSensors.remove(sensor);
if (parent != null) {
metrics.removeSensor(parent.name());
}
}
+ /**
+ * Visible for testing
+ */
+ Map<Sensor, Sensor> parentSensors() {
+ return Collections.unmodifiableMap(parentSensors);
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
similarity index 64%
rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index a72dc79..cccc458 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -14,14 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.internals.metrics;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.common.metrics.stats.Count;
import org.junit.Test;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.Assert.assertEquals;
public class StreamsMetricsImplTest {
@@ -57,6 +63,55 @@ public class StreamsMetricsImplTest {
final Sensor sensor3 = streamsMetrics.addThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor3);
+
+ assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors());
+ }
+
+ @Test
+ public void testMutiLevelSensorRemoval() {
+ final Metrics registry = new Metrics();
+ final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, "");
+ for (final MetricName defaultMetric : registry.metrics().keySet()) {
+ registry.removeMetric(defaultMetric);
+ }
+
+ final String taskName = "taskName";
+ final String operation = "operation";
+ final Map<String, String> threadTags = mkMap(mkEntry("threadkey", "value"));
+
+ final Map<String, String> taskTags = mkMap(mkEntry("taskkey", "value"));
+
+ final Sensor parent1 = metrics.threadLevelSensor(operation, Sensor.RecordingLevel.DEBUG);
+ parent1.add(new MetricName("name", "group", "description", threadTags), new Count());
+
+ assertEquals(1, registry.metrics().size());
+
+ final Sensor sensor1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG, parent1);
+ sensor1.add(new MetricName("name", "group", "description", taskTags), new Count());
+
+ assertEquals(2, registry.metrics().size());
+
+ metrics.removeAllTaskLevelSensors(taskName);
+
+ assertEquals(1, registry.metrics().size());
+
+ final Sensor parent2 = metrics.threadLevelSensor(operation, Sensor.RecordingLevel.DEBUG);
+ parent2.add(new MetricName("name", "group", "description", threadTags), new Count());
+
+ assertEquals(1, registry.metrics().size());
+
+ final Sensor sensor2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG, parent2);
+ sensor2.add(new MetricName("name", "group", "description", taskTags), new Count());
+
+ assertEquals(2, registry.metrics().size());
+
+ metrics.removeAllTaskLevelSensors(taskName);
+
+ assertEquals(1, registry.metrics().size());
+
+ metrics.removeAllThreadLevelSensors();
+
+ assertEquals(0, registry.metrics().size());
}
@Test
@@ -90,7 +145,7 @@ public class StreamsMetricsImplTest {
final String entity = "entity";
final String operation = "put";
- final Sensor sensor1 = streamsMetrics.addThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+ final Sensor sensor1 = streamsMetrics.addThroughputSensor(taskName, scope, entity, operation, Sensor.RecordingLevel.DEBUG);
final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
// 2 meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor