You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/01 20:59:45 UTC
[kafka] branch 1.0 updated: KAFKA-6925: fix parentSensors memory
leak (#5108) (#5119)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new fb87ba3 KAFKA-6925: fix parentSensors memory leak (#5108) (#5119)
fb87ba3 is described below
commit fb87ba31fe018b11ba2c939bc42ea1a2795eec99
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Jun 1 15:59:21 2018 -0500
KAFKA-6925: fix parentSensors memory leak (#5108) (#5119)
Previously, we failed to remove sensors from the parentSensors map, effectively a memory leak.
Add a test to verify that removed sensors get removed from the underlying registry as well as the parentSensors map.
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../streams/processor/internals/StreamsMetricsImpl.java | 1 +
.../processor/internals/StreamsMetricsImplTest.java | 14 +++++++++++++-
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index 03bbceb..03a4819 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -222,6 +222,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final Sensor parent = parentSensors.get(sensor);
if (parent != null) {
metrics.removeSensor(parent.name());
+ parentSensors.remove(sensor);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index 7b16246..7666e42 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -19,11 +19,15 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.junit.Assert;
import org.junit.Test;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -53,19 +57,27 @@ public class StreamsMetricsImplTest {
String entity = "entity";
String operation = "put";
Map<String, String> tags = new HashMap<>();
- StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
+ final Metrics metrics = new Metrics();
+ final Map<MetricName, KafkaMetric> initialMetrics = Collections.unmodifiableMap(new LinkedHashMap<>(metrics.metrics()));
+ StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, groupName, tags);
Sensor sensor1 = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor1);
+ Assert.assertEquals(initialMetrics, metrics.metrics());
Sensor sensor1a = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG, sensor1);
streamsMetrics.removeSensor(sensor1a);
+ Assert.assertEquals(initialMetrics, metrics.metrics());
Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor2);
+ Assert.assertEquals(initialMetrics, metrics.metrics());
Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor3);
+ Assert.assertEquals(initialMetrics, metrics.metrics());
+
+ Assert.assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors);
}
@Test
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.