You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/07/28 02:31:17 UTC
[kafka] branch trunk updated: KAFKA-14007: Close header converters during Connect task shutdown (#12309)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0c5f5a7f8b KAFKA-14007: Close header converters during Connect task shutdown (#12309)
0c5f5a7f8b is described below
commit 0c5f5a7f8b3628e991459ba9cff414c675676b8b
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Thu Jul 28 08:01:07 2022 +0530
KAFKA-14007: Close header converters during Connect task shutdown (#12309)
The HeaderConverter interface extends Closeable, but we weren't closing them anywhere before. This change causes header converters to be closed as part of task shutdown.
Reviewers: Kvicii <42...@users.noreply.github.com>, Chris Egerton <fe...@gmail.com>
---
.../org/apache/kafka/connect/storage/StringConverter.java | 4 +++-
.../java/org/apache/kafka/connect/json/JsonConverter.java | 4 +++-
.../org/apache/kafka/connect/converters/NumberConverter.java | 3 +++
.../kafka/connect/runtime/AbstractWorkerSourceTask.java | 1 +
.../java/org/apache/kafka/connect/runtime/WorkerSinkTask.java | 1 +
.../apache/kafka/connect/runtime/ErrorHandlingTaskTest.java | 11 +++++++++++
.../connect/runtime/ExactlyOnceWorkerSourceTaskTest.java | 1 +
.../org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java | 3 +++
.../kafka/connect/runtime/WorkerSinkTaskThreadedTest.java | 8 ++++++++
.../apache/kafka/connect/runtime/WorkerSourceTaskTest.java | 8 ++++++++
10 files changed, 42 insertions(+), 2 deletions(-)
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
index 534cdddfa1..69eda3459b 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -104,6 +105,7 @@ public class StringConverter implements Converter, HeaderConverter {
@Override
public void close() {
- // do nothing
+ Utils.closeQuietly(this.serializer, "string converter serializer");
+ Utils.closeQuietly(this.deserializer, "string converter deserializer");
}
}
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index 10fde8f20a..6a17ae277b 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@@ -282,7 +283,8 @@ public class JsonConverter implements Converter, HeaderConverter {
@Override
public void close() {
- // do nothing
+ Utils.closeQuietly(this.serializer, "JSON converter serializer");
+ Utils.closeQuietly(this.deserializer, "JSON converter deserializer");
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
index 4605b96f5b..c76486eb8b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -122,5 +123,7 @@ abstract class NumberConverter<T extends Number> implements Converter, HeaderCon
@Override
public void close() {
+ Utils.closeQuietly(this.serializer, "number converter serializer");
+ Utils.closeQuietly(this.deserializer, "number converter deserializer");
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 407f5fd828..a2a3fce9fe 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -313,6 +313,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
Utils.closeQuietly(offsetReader, "offset reader");
Utils.closeQuietly(offsetStore::stop, "offset backing store");
+ Utils.closeQuietly(headerConverter, "header converter");
}
private void closeProducer(Duration duration) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 01303e308d..dfe815dffc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -176,6 +176,7 @@ class WorkerSinkTask extends WorkerTask {
Utils.closeQuietly(consumer, "consumer");
Utils.closeQuietly(transformationChain, "transformation chain");
Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
+ Utils.closeQuietly(headerConverter, "header converter");
}
@Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index d4184bbab2..b2ba417880 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -77,6 +77,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@@ -244,6 +245,9 @@ public class ErrorHandlingTaskTest {
consumer.close();
EasyMock.expectLastCall();
+ headerConverter.close();
+ EasyMock.expectLastCall();
+
PowerMock.replayAll();
workerSinkTask.initialize(TASK_CONFIG);
@@ -541,6 +545,13 @@ public class ErrorHandlingTaskTest {
offsetStore.stop();
EasyMock.expectLastCall();
+
+ try {
+ headerConverter.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ EasyMock.expectLastCall();
}
private void expectTopicCreation(String topic) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index fdc253d04e..44427b5b54 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -1311,6 +1311,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends ThreadedTest {
expectCall(() -> admin.close(EasyMock.anyObject(Duration.class)));
expectCall(transformationChain::close);
expectCall(offsetReader::close);
+ expectCall(headerConverter::close);
}
private void expectTopicCreation(String topic) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 41d4de5e0a..4aaf764966 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -348,6 +348,9 @@ public class WorkerSinkTaskTest {
transformationChain.close();
PowerMock.expectLastCall();
+ headerConverter.close();
+ PowerMock.expectLastCall();
+
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index eef6ca82be..cdd87e230d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -56,6 +56,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
+import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@@ -552,6 +553,13 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
consumer.close();
PowerMock.expectLastCall();
+
+ try {
+ headerConverter.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ PowerMock.expectLastCall();
}
// Note that this can only be called once per test currently
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 2d2cd00cf5..0366677b17 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -68,6 +68,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.powermock.reflect.Whitebox;
+import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@@ -1091,6 +1092,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
offsetStore.stop();
EasyMock.expectLastCall();
+
+ try {
+ headerConverter.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ EasyMock.expectLastCall();
}
private void expectTopicCreation(String topic) {