You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vi...@apache.org on 2023/05/11 11:03:13 UTC

[kafka] branch 3.4 updated: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690)

This is an automated email from the ASF dual-hosted git repository.

viktor pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 16d08e9e631 KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690)
16d08e9e631 is described below

commit 16d08e9e631aa60e6ef400735394179037262126
Author: Viktor Somogyi-Vass <vi...@gmail.com>
AuthorDate: Thu May 11 13:03:04 2023 +0200

    KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690)
    
    Reviewers: Chris Egerton <ch...@aiven.io>, Viktor Somogyi-Vass <vi...@gmail.com>
    
    Co-authored-by: Dániel Urbán <48...@users.noreply.github.com>
---
 .../connect/runtime/ExactlyOnceWorkerSourceTask.java |  1 +
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java     | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index e0fc987a3da..47269fb354e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -233,6 +233,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
     @Override
     public void removeMetrics() {
         Utils.closeQuietly(transactionMetrics, "source task transaction metrics tracker");
+        super.removeMetrics();
     }
 
     @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 7882b862c84..9edaf08efe6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -82,7 +83,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
+import static java.util.Collections.emptySet;
 import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -291,6 +294,23 @@ public class ExactlyOnceWorkerSourceTaskTest {
                 sourceConfig, Runnable::run, preProducerCheck, postProducerCheck);
     }
 
+    @Test
+    public void testRemoveMetrics() {
+        createWorkerTask();
+
+        workerTask.removeMetrics();
+
+        assertEquals(emptySet(), filterToTaskMetrics(metrics.metrics().metrics().keySet()));
+    }
+
+    private Set<MetricName> filterToTaskMetrics(Set<MetricName> metricNames) {
+        return metricNames
+                .stream()
+                .filter(m -> metrics.registry().taskGroupName().equals(m.group())
+                        || metrics.registry().sourceTaskGroupName().equals(m.group()))
+                .collect(Collectors.toSet());
+    }
+
     @Test
     public void testStartPaused() throws Exception {
         createWorkerTask(TargetState.PAUSED);