You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/01/06 05:24:54 UTC

[kafka] branch trunk updated: KAFKA-6252; Close the metric group to clean up any existing metrics

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3b03c9d  KAFKA-6252; Close the metric group to clean up any existing metrics
3b03c9d is described below

commit 3b03c9d3484f59c7a5fccb282ff77ad45734c10c
Author: Arjun Satish <ar...@confluent.io>
AuthorDate: Fri Jan 5 21:23:39 2018 -0800

    KAFKA-6252; Close the metric group to clean up any existing metrics
    
    We are closing the metricGroups created in a Worker, Source task and Sink task before populating them with new metrics. This helps in cases where an Exception is thrown when previously created groups were not cleaned up correctly.
    
    Signed-off-by: Arjun Satish <arjunconfluent.io>
    
    Author: Arjun Satish <ar...@confluent.io>
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #4397 from wicknicks/KAFKA-6252
---
 .../kafka/connect/runtime/WorkerConnector.java     |  2 +
 .../kafka/connect/runtime/WorkerSinkTask.java      |  2 +
 .../kafka/connect/runtime/WorkerSourceTask.java    |  2 +
 .../apache/kafka/connect/runtime/WorkerTask.java   |  2 +
 .../kafka/connect/runtime/ConnectMetricsTest.java  | 44 +++++++++++++++++++++-
 5 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 9e65cd2..9b934f3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -247,6 +247,8 @@ public class WorkerConnector {
             ConnectMetricsRegistry registry = connectMetrics.registry();
             this.metricGroup = connectMetrics.group(registry.connectorGroupName(),
                     registry.connectorTagName(), connName);
+            // prevent collisions by removing any previously created metrics in this group.
+            metricGroup.close();
 
             metricGroup.addImmutableValueMetric(registry.connectorType, connectorType());
             metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 05ace58..6961494 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -674,6 +674,8 @@ class WorkerSinkTask extends WorkerTask {
             metricGroup = connectMetrics
                                   .group(registry.sinkTaskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(),
                                          Integer.toString(id.task()));
+            // prevent collisions by removing any previously created metrics in this group.
+            metricGroup.close();
 
             sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
             sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9072cd4..a172cdb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -494,6 +494,8 @@ class WorkerSourceTask extends WorkerTask {
             metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
                     registry.connectorTagName(), id.connector(),
                     registry.taskTagName(), Integer.toString(id.task()));
+            // remove any previously created metrics in this group to prevent collisions.
+            metricGroup.close();
 
             sourceRecordPoll = metricGroup.sensor("source-record-poll");
             sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new Rate());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ec06924..d563f9b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -313,6 +313,8 @@ abstract class WorkerTask implements Runnable {
             metricGroup = connectMetrics.group(registry.taskGroupName(),
                     registry.connectorTagName(), id.connector(),
                     registry.taskTagName(), Integer.toString(id.task()));
+            // prevent collisions by removing any previously created metrics in this group.
+            metricGroup.close();
 
             metricGroup.addValueMetric(registry.taskStatus, new LiteralSupplier<String>() {
                 @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index d496cbe..60bd863 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -16,13 +16,19 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -136,4 +142,40 @@ public class ConnectMetricsTest {
         assertNotNull(id1.tags());
         assertNotNull(id2.tags());
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testRecreateWithClose() {
+        int numMetrics = addToGroup(metrics, false);
+        int numMetricsInRecreatedGroup = addToGroup(metrics, true);
+        Assert.assertEquals(numMetrics, numMetricsInRecreatedGroup);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testRecreateWithoutClose() {
+        int numMetrics = addToGroup(metrics, false);
+        int numMetricsInRecreatedGroup = addToGroup(metrics, false);
+        // we should never get here
+        throw new RuntimeException("Created " + numMetricsInRecreatedGroup
+                + " metrics in recreated group. Original=" + numMetrics);
+    }
+
+    private int addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        ConnectMetrics.MetricGroup metricGroup = connectMetrics.group(registry.taskGroupName(),
+                registry.connectorTagName(), "conn_name");
+
+        if (shouldClose) {
+            metricGroup.close();
+        }
+
+        Sensor sensor = metricGroup.sensor("my_sensor");
+        sensor.add(metricName("x1"), new Max());
+        sensor.add(metricName("y2"), new Avg());
+
+        return metricGroup.metrics().metrics().size();
+    }
+
+    static MetricName metricName(String name) {
+        return new MetricName(name, "test_group", "metrics for testing", Collections.<String, String>emptyMap());
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].