You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/12/04 21:45:19 UTC

[kafka] branch 2.1 updated: KAFKA-7660: fix streams and Metrics memory leaks (#5979)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2b50cbc  KAFKA-7660: fix streams and Metrics memory leaks (#5979)
2b50cbc is described below

commit 2b50cbc302376835e3e48151533c041f804058a8
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Dec 4 15:45:08 2018 -0600

    KAFKA-7660: fix streams and Metrics memory leaks (#5979)
---
 .../org/apache/kafka/common/metrics/Metrics.java   |  5 ++
 .../org/apache/kafka/common/metrics/Sensor.java    | 10 +++-
 .../apache/kafka/common/metrics/MetricsTest.java   | 16 ++++++
 .../streams/processor/internals/ProcessorNode.java | 23 ++++----
 .../internals/metrics/StreamsMetricsImpl.java      | 43 +++++++-------
 .../{ => metrics}/StreamsMetricsImplTest.java      | 67 +++++++++++++++++++++-
 6 files changed, 127 insertions(+), 37 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index a6da9f9..9e2b6f1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Collections.emptyList;
+
 /**
  * A registry of sensors and metrics.
  * <p>
@@ -446,6 +448,9 @@ public class Metrics implements Closeable {
                             removeMetric(metric.metricName());
                         log.debug("Removed sensor with name {}", name);
                         childSensors = childrenSensors.remove(sensor);
+                        for (final Sensor parent : sensor.parents()) {
+                            childrenSensors.getOrDefault(parent, emptyList()).remove(sensor);
+                        }
                     }
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index ccbe8aa..1af9419 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -32,6 +31,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+
 /**
  * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
  * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
@@ -133,6 +135,10 @@ public final class Sensor {
         return this.name;
     }
 
+    List<Sensor> parents() {
+        return unmodifiableList(asList(parents));
+    }
+
     /**
      * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)}
      */
@@ -291,7 +297,7 @@ public final class Sensor {
     }
 
     synchronized List<KafkaMetric> metrics() {
-        return Collections.unmodifiableList(new LinkedList<>(this.metrics.values()));
+        return unmodifiableList(new LinkedList<>(this.metrics.values()));
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index eb3f775..992abcb 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.metrics;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -203,6 +205,20 @@ public class MetricsTest {
     }
 
     @Test
+    public void testRemoveChildSensor() {
+        final Metrics metrics = new Metrics();
+
+        final Sensor parent = metrics.sensor("parent");
+        final Sensor child = metrics.sensor("child", parent);
+
+        assertEquals(singletonList(child), metrics.childrenSensors().get(parent));
+
+        metrics.removeSensor("child");
+
+        assertEquals(emptyList(), metrics.childrenSensors().get(parent));
+    }
+
+    @Test
     public void testRemoveSensor() {
         int size = metrics.metrics().size();
         Sensor parent1 = metrics.sensor("test.parent1");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 8dc6417..ec4f8e5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
 
@@ -165,15 +167,13 @@ public class ProcessorNode<K, V> {
         private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNodeName, final ProcessorContext context) {
             this.metrics = metrics;
 
-            final String group = "stream-processor-node-metrics";
             final String taskName = context.taskId().toString();
-            final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", processorNodeName);
-            final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(), "processor-node-id", "all");
+            final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, processorNodeName);
+            final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, "all");
 
             nodeProcessTimeSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "process",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -183,7 +183,6 @@ public class ProcessorNode<K, V> {
             nodePunctuateTimeSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "punctuate",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -193,7 +192,6 @@ public class ProcessorNode<K, V> {
             nodeCreationSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "create",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -204,7 +202,6 @@ public class ProcessorNode<K, V> {
             nodeDestructionSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "destroy",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -214,7 +211,6 @@ public class ProcessorNode<K, V> {
             sourceNodeForwardSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "forward",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -231,17 +227,18 @@ public class ProcessorNode<K, V> {
 
         private static Sensor createTaskAndNodeLatencyAndThroughputSensors(final String operation,
                                                                            final StreamsMetricsImpl metrics,
-                                                                           final String group,
                                                                            final String taskName,
                                                                            final String processorNodeName,
                                                                            final Map<String, String> taskTags,
                                                                            final Map<String, String> nodeTags) {
             final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
-            addAvgMaxLatency(parent, group, taskTags, operation);
-            addInvocationRateAndCount(parent, group, taskTags, operation);
+            addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+            addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+
             final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
-            addAvgMaxLatency(sensor, group, nodeTags, operation);
-            addInvocationRateAndCount(sensor, group, nodeTags, operation);
+            addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+            addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+
             return sensor;
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 1703112..dd6cc4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -51,6 +51,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     private static final String SENSOR_PREFIX_DELIMITER = ".";
     private static final String SENSOR_NAME_DELIMITER = ".s.";
 
+    public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics";
+    public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
+
     public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
         this.threadName = threadName;
@@ -112,11 +115,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     public final void removeAllTaskLevelSensors(final String taskName) {
         final String key = taskSensorPrefix(taskName);
         synchronized (taskLevelSensors) {
-            if (taskLevelSensors.containsKey(key)) {
-                while (!taskLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(taskLevelSensors.get(key).pop());
-                }
-                taskLevelSensors.remove(key);
+            final Deque<String> sensors = taskLevelSensors.remove(key);
+            while (sensors != null && !sensors.isEmpty()) {
+                metrics.removeSensor(sensors.pop());
             }
         }
     }
@@ -149,10 +150,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     public final void removeAllNodeLevelSensors(final String taskName, final String processorNodeName) {
         final String key = nodeSensorPrefix(taskName, processorNodeName);
         synchronized (nodeLevelSensors) {
-            if (nodeLevelSensors.containsKey(key)) {
-                while (!nodeLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(nodeLevelSensors.get(key).pop());
-                }
+            final Deque<String> sensors = nodeLevelSensors.remove(key);
+            while (sensors != null && !sensors.isEmpty()) {
+                metrics.removeSensor(sensors.pop());
             }
         }
     }
@@ -185,11 +185,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) {
         final String key = cacheSensorPrefix(taskName, cacheName);
         synchronized (cacheLevelSensors) {
-            if (cacheLevelSensors.containsKey(key)) {
-                while (!cacheLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(cacheLevelSensors.get(key).pop());
-                }
-                cacheLevelSensors.remove(key);
+            final Deque<String> strings = cacheLevelSensors.remove(key);
+            while (strings != null && !strings.isEmpty()) {
+                metrics.removeSensor(strings.pop());
             }
         }
     }
@@ -222,11 +220,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     public final void removeAllStoreLevelSensors(final String taskName, final String storeName) {
         final String key = storeSensorPrefix(taskName, storeName);
         synchronized (storeLevelSensors) {
-            if (storeLevelSensors.containsKey(key)) {
-                while (!storeLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(storeLevelSensors.get(key).pop());
-                }
-                storeLevelSensors.remove(key);
+            final Deque<String> sensors = storeLevelSensors.remove(key);
+            while (sensors != null && !sensors.isEmpty()) {
+                metrics.removeSensor(sensors.pop());
             }
         }
     }
@@ -410,12 +406,19 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         Objects.requireNonNull(sensor, "Sensor is null");
         metrics.removeSensor(sensor.name());
 
-        final Sensor parent = parentSensors.get(sensor);
+        final Sensor parent = parentSensors.remove(sensor);
         if (parent != null) {
             metrics.removeSensor(parent.name());
         }
     }
 
+    /**
+     * Visible for testing
+     */
+    Map<Sensor, Sensor> parentSensors() {
+        return Collections.unmodifiableMap(parentSensors);
+    }
+
     private static String groupNameFromScope(final String scopeName) {
         return "stream-" + scopeName + "-metrics";
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
similarity index 62%
rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 7ce27b4..38629c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.internals.metrics;
 
 
 import org.apache.kafka.common.MetricName;
@@ -23,12 +23,21 @@ import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 public class StreamsMetricsImplTest {
 
@@ -62,6 +71,60 @@ public class StreamsMetricsImplTest {
 
         final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor3);
+
+        assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors());
+    }
+
+    @Test
+    public void testMutiLevelSensorRemoval() {
+        final Metrics registry = new Metrics();
+        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, "");
+        for (final MetricName defaultMetric : registry.metrics().keySet()) {
+            registry.removeMetric(defaultMetric);
+        }
+
+        final String taskName = "taskName";
+        final String operation = "operation";
+        final Map<String, String> taskTags = mkMap(mkEntry("tkey", "value"));
+
+        final String processorNodeName = "processorNodeName";
+        final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value"));
+
+        final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
+        addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+        addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+
+        final int numberOfTaskMetrics = registry.metrics().size();
+
+        final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1);
+        addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+        addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+
+        assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
+
+        metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
+
+        assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+        final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
+        addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+        addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+
+        assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+        final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2);
+        addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+        addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+
+        assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
+
+        metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
+
+        assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+        metrics.removeAllTaskLevelSensors(taskName);
+
+        assertThat(registry.metrics().size(), equalTo(0));
     }
 
     @Test