You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vi...@apache.org on 2023/05/10 14:42:09 UTC

[kafka] branch trunk updated: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690)

This is an automated email from the ASF dual-hosted git repository.

viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f17fb75b2de KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690)
f17fb75b2de is described below

commit f17fb75b2de32512f14cb94a7d1bfb0f37485780
Author: Dániel Urbán <48...@users.noreply.github.com>
AuthorDate: Wed May 10 16:41:52 2023 +0200

    KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690)
    
    Reviewers: Chris Egerton <ch...@aiven.io>, Viktor Somogyi-Vass <vi...@gmail.com>
---
 .../connect/runtime/ExactlyOnceWorkerSourceTask.java |  1 +
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java     | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 2b9b7aa75f1..30dafaac81d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -233,6 +233,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
     @Override
     public void removeMetrics() {
         Utils.closeQuietly(transactionMetrics, "source task transaction metrics tracker");
+        super.removeMetrics();
     }
 
     @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 7c920a3d988..9939becc9c1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -82,7 +83,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
+import static java.util.Collections.emptySet;
 import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -291,6 +294,23 @@ public class ExactlyOnceWorkerSourceTaskTest {
                 sourceConfig, Runnable::run, preProducerCheck, postProducerCheck);
     }
 
+    @Test
+    public void testRemoveMetrics() {
+        createWorkerTask();
+
+        workerTask.removeMetrics();
+
+        assertEquals(emptySet(), filterToTaskMetrics(metrics.metrics().metrics().keySet()));
+    }
+
+    private Set<MetricName> filterToTaskMetrics(Set<MetricName> metricNames) {
+        return metricNames
+                .stream()
+                .filter(m -> metrics.registry().taskGroupName().equals(m.group())
+                        || metrics.registry().sourceTaskGroupName().equals(m.group()))
+                .collect(Collectors.toSet());
+    }
+
     @Test
     public void testStartPaused() throws Exception {
         createWorkerTask(TargetState.PAUSED);