You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vi...@apache.org on 2023/05/10 14:42:09 UTC
[kafka] branch trunk updated: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690)
This is an automated email from the ASF dual-hosted git repository.
viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f17fb75b2de KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690)
f17fb75b2de is described below
commit f17fb75b2de32512f14cb94a7d1bfb0f37485780
Author: Dániel Urbán <48...@users.noreply.github.com>
AuthorDate: Wed May 10 16:41:52 2023 +0200
KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690)
Reviewers: Chris Egerton <ch...@aiven.io>, Viktor Somogyi-Vass <vi...@gmail.com>
---
.../connect/runtime/ExactlyOnceWorkerSourceTask.java | 1 +
.../runtime/ExactlyOnceWorkerSourceTaskTest.java | 20 ++++++++++++++++++++
2 files changed, 21 insertions(+)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 2b9b7aa75f1..30dafaac81d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -233,6 +233,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
@Override
public void removeMetrics() {
Utils.closeQuietly(transactionMetrics, "source task transaction metrics tracker");
+ super.removeMetrics();
}
@Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 7c920a3d988..9939becc9c1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -82,7 +83,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import static java.util.Collections.emptySet;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -291,6 +294,23 @@ public class ExactlyOnceWorkerSourceTaskTest {
sourceConfig, Runnable::run, preProducerCheck, postProducerCheck);
}
+ @Test
+ public void testRemoveMetrics() {
+ createWorkerTask();
+
+ workerTask.removeMetrics();
+
+ assertEquals(emptySet(), filterToTaskMetrics(metrics.metrics().metrics().keySet()));
+ }
+
+ private Set<MetricName> filterToTaskMetrics(Set<MetricName> metricNames) {
+ return metricNames
+ .stream()
+ .filter(m -> metrics.registry().taskGroupName().equals(m.group())
+ || metrics.registry().sourceTaskGroupName().equals(m.group()))
+ .collect(Collectors.toSet());
+ }
+
@Test
public void testStartPaused() throws Exception {
createWorkerTask(TargetState.PAUSED);