You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/29 23:10:02 UTC
[kafka] branch trunk updated: KAFKA-7660: fix parentSensors memory
leak (#5953)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bfbc32d KAFKA-7660: fix parentSensors memory leak (#5953)
bfbc32d is described below
commit bfbc32d9bc9e63c5e02840aeed4de157654dc5e8
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Nov 29 17:09:50 2018 -0600
KAFKA-7660: fix parentSensors memory leak (#5953)
In StreamsMetricsImpl, the parentSensors map was keeping references to Sensors after the sensors themselves had been removed.
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../streams/processor/internals/ProcessorNode.java | 2 +
.../internals/metrics/StreamsMetricsImpl.java | 40 +++++-----
.../{ => metrics}/StreamsMetricsImplTest.java | 89 ++++++++++++++++++----
3 files changed, 98 insertions(+), 33 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 8483791..ec4f8e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -234,9 +234,11 @@ public class ProcessorNode<K, V> {
final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+
final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+
return sensor;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 8ec2711..dd6cc4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -115,11 +115,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public final void removeAllTaskLevelSensors(final String taskName) {
final String key = taskSensorPrefix(taskName);
synchronized (taskLevelSensors) {
- if (taskLevelSensors.containsKey(key)) {
- while (!taskLevelSensors.get(key).isEmpty()) {
- metrics.removeSensor(taskLevelSensors.get(key).pop());
- }
- taskLevelSensors.remove(key);
+ final Deque<String> sensors = taskLevelSensors.remove(key);
+ while (sensors != null && !sensors.isEmpty()) {
+ metrics.removeSensor(sensors.pop());
}
}
}
@@ -152,10 +150,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public final void removeAllNodeLevelSensors(final String taskName, final String processorNodeName) {
final String key = nodeSensorPrefix(taskName, processorNodeName);
synchronized (nodeLevelSensors) {
- if (nodeLevelSensors.containsKey(key)) {
- while (!nodeLevelSensors.get(key).isEmpty()) {
- metrics.removeSensor(nodeLevelSensors.get(key).pop());
- }
+ final Deque<String> sensors = nodeLevelSensors.remove(key);
+ while (sensors != null && !sensors.isEmpty()) {
+ metrics.removeSensor(sensors.pop());
}
}
}
@@ -188,11 +185,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) {
final String key = cacheSensorPrefix(taskName, cacheName);
synchronized (cacheLevelSensors) {
- if (cacheLevelSensors.containsKey(key)) {
- while (!cacheLevelSensors.get(key).isEmpty()) {
- metrics.removeSensor(cacheLevelSensors.get(key).pop());
- }
- cacheLevelSensors.remove(key);
+ final Deque<String> strings = cacheLevelSensors.remove(key);
+ while (strings != null && !strings.isEmpty()) {
+ metrics.removeSensor(strings.pop());
}
}
}
@@ -225,11 +220,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public final void removeAllStoreLevelSensors(final String taskName, final String storeName) {
final String key = storeSensorPrefix(taskName, storeName);
synchronized (storeLevelSensors) {
- if (storeLevelSensors.containsKey(key)) {
- while (!storeLevelSensors.get(key).isEmpty()) {
- metrics.removeSensor(storeLevelSensors.get(key).pop());
- }
- storeLevelSensors.remove(key);
+ final Deque<String> sensors = storeLevelSensors.remove(key);
+ while (sensors != null && !sensors.isEmpty()) {
+ metrics.removeSensor(sensors.pop());
}
}
}
@@ -413,12 +406,19 @@ public class StreamsMetricsImpl implements StreamsMetrics {
Objects.requireNonNull(sensor, "Sensor is null");
metrics.removeSensor(sensor.name());
- final Sensor parent = parentSensors.get(sensor);
+ final Sensor parent = parentSensors.remove(sensor);
if (parent != null) {
metrics.removeSensor(parent.name());
}
}
+ /**
+ * Visible for testing
+ */
+ Map<Sensor, Sensor> parentSensors() {
+ return Collections.unmodifiableMap(parentSensors);
+ }
+
private static String groupNameFromScope(final String scopeName) {
return "stream-" + scopeName + "-metrics";
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
similarity index 59%
rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 7ce27b4..cadfdb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.MetricName;
@@ -23,12 +23,21 @@ import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.junit.Test;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
public class StreamsMetricsImplTest {
@@ -62,6 +71,60 @@ public class StreamsMetricsImplTest {
final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
streamsMetrics.removeSensor(sensor3);
+
+ assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors());
+ }
+
+ @Test
+ public void testMutiLevelSensorRemoval() {
+ final Metrics registry = new Metrics();
+ final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, "");
+ for (final MetricName defaultMetric : registry.metrics().keySet()) {
+ registry.removeMetric(defaultMetric);
+ }
+
+ final String taskName = "taskName";
+ final String operation = "operation";
+ final Map<String, String> taskTags = mkMap(mkEntry("tkey", "value"));
+
+ final String processorNodeName = "processorNodeName";
+ final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value"));
+
+ final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
+ addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+ addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+
+ final int numberOfTaskMetrics = registry.metrics().size();
+
+ final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1);
+ addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+ addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+
+ assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
+
+ metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
+
+ assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+ final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
+ addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+ addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+
+ assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+ final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2);
+ addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+ addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+
+ assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
+
+ metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
+
+ assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+ metrics.removeAllTaskLevelSensors(taskName);
+
+ assertThat(registry.metrics().size(), equalTo(0));
}
@Test
@@ -115,21 +178,21 @@ public class StreamsMetricsImplTest {
final String operation = "op";
final Sensor sensor = streamsMetrics.addLatencyAndThroughputSensor(
- scope,
- entity,
- operation,
- Sensor.RecordingLevel.INFO
+ scope,
+ entity,
+ operation,
+ Sensor.RecordingLevel.INFO
);
final double latency = 100.0;
final MetricName totalMetricName = metrics.metricName(
- "op-total",
- "stream-scope-metrics",
- "",
- "client-id",
- "",
- "scope-id",
- "entity"
+ "op-total",
+ "stream-scope-metrics",
+ "",
+ "client-id",
+ "",
+ "scope-id",
+ "entity"
);
final KafkaMetric totalMetric = metrics.metric(totalMetricName);