You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/01/06 05:24:54 UTC
[kafka] branch trunk updated: KAFKA-6252;
Close the metric group to clean up any existing metrics
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3b03c9d KAFKA-6252; Close the metric group to clean up any existing metrics
3b03c9d is described below
commit 3b03c9d3484f59c7a5fccb282ff77ad45734c10c
Author: Arjun Satish <ar...@confluent.io>
AuthorDate: Fri Jan 5 21:23:39 2018 -0800
KAFKA-6252; Close the metric group to clean up any existing metrics
We are closing the metricGroups created in a Worker, Source task and Sink task before populating them with new metrics. This helps in cases where an Exception is thrown when previously created groups were not cleaned up correctly.
Signed-off-by: Arjun Satish <arjunconfluent.io>
Author: Arjun Satish <ar...@confluent.io>
Reviewers: Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #4397 from wicknicks/KAFKA-6252
---
.../kafka/connect/runtime/WorkerConnector.java | 2 +
.../kafka/connect/runtime/WorkerSinkTask.java | 2 +
.../kafka/connect/runtime/WorkerSourceTask.java | 2 +
.../apache/kafka/connect/runtime/WorkerTask.java | 2 +
.../kafka/connect/runtime/ConnectMetricsTest.java | 44 +++++++++++++++++++++-
5 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 9e65cd2..9b934f3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -247,6 +247,8 @@ public class WorkerConnector {
ConnectMetricsRegistry registry = connectMetrics.registry();
this.metricGroup = connectMetrics.group(registry.connectorGroupName(),
registry.connectorTagName(), connName);
+ // prevent collisions by removing any previously created metrics in this group.
+ metricGroup.close();
metricGroup.addImmutableValueMetric(registry.connectorType, connectorType());
metricGroup.addImmutableValueMetric(registry.connectorClass, connector.getClass().getName());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 05ace58..6961494 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -674,6 +674,8 @@ class WorkerSinkTask extends WorkerTask {
metricGroup = connectMetrics
.group(registry.sinkTaskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(),
Integer.toString(id.task()));
+ // prevent collisions by removing any previously created metrics in this group.
+ metricGroup.close();
sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9072cd4..a172cdb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -494,6 +494,8 @@ class WorkerSourceTask extends WorkerTask {
metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
registry.connectorTagName(), id.connector(),
registry.taskTagName(), Integer.toString(id.task()));
+ // remove any previously created metrics in this group to prevent collisions.
+ metricGroup.close();
sourceRecordPoll = metricGroup.sensor("source-record-poll");
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new Rate());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ec06924..d563f9b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -313,6 +313,8 @@ abstract class WorkerTask implements Runnable {
metricGroup = connectMetrics.group(registry.taskGroupName(),
registry.connectorTagName(), id.connector(),
registry.taskTagName(), Integer.toString(id.task()));
+ // prevent collisions by removing any previously created metrics in this group.
+ metricGroup.close();
metricGroup.addValueMetric(registry.taskStatus, new LiteralSupplier<String>() {
@Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index d496cbe..60bd863 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -16,13 +16,19 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId;
import org.apache.kafka.common.utils.MockTime;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -136,4 +142,40 @@ public class ConnectMetricsTest {
assertNotNull(id1.tags());
assertNotNull(id2.tags());
}
-}
\ No newline at end of file
+
+ @Test
+ public void testRecreateWithClose() {
+ int numMetrics = addToGroup(metrics, false);
+ int numMetricsInRecreatedGroup = addToGroup(metrics, true);
+ Assert.assertEquals(numMetrics, numMetricsInRecreatedGroup);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testRecreateWithoutClose() {
+ int numMetrics = addToGroup(metrics, false);
+ int numMetricsInRecreatedGroup = addToGroup(metrics, false);
+ // we should never get here
+ throw new RuntimeException("Created " + numMetricsInRecreatedGroup
+ + " metrics in recreated group. Original=" + numMetrics);
+ }
+
+ private int addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) {
+ ConnectMetricsRegistry registry = connectMetrics.registry();
+ ConnectMetrics.MetricGroup metricGroup = connectMetrics.group(registry.taskGroupName(),
+ registry.connectorTagName(), "conn_name");
+
+ if (shouldClose) {
+ metricGroup.close();
+ }
+
+ Sensor sensor = metricGroup.sensor("my_sensor");
+ sensor.add(metricName("x1"), new Max());
+ sensor.add(metricName("y2"), new Avg());
+
+ return metricGroup.metrics().metrics().size();
+ }
+
+ static MetricName metricName(String name) {
+ return new MetricName(name, "test_group", "metrics for testing", Collections.<String, String>emptyMap());
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].