You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/07/28 02:31:17 UTC

[kafka] branch trunk updated: KAFKA-14007: Close header converters during Connect task shutdown (#12309)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0c5f5a7f8b KAFKA-14007: Close header converters during Connect task shutdown (#12309)
0c5f5a7f8b is described below

commit 0c5f5a7f8b3628e991459ba9cff414c675676b8b
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Thu Jul 28 08:01:07 2022 +0530

    KAFKA-14007: Close header converters during Connect task shutdown (#12309)
    
    The HeaderConverter interface extends Closeable, but we weren't closing them anywhere before. This change causes header converters to be closed as part of task shutdown.
    
    Reviewers: Kvicii <42...@users.noreply.github.com>, Chris Egerton <fe...@gmail.com>
---
 .../org/apache/kafka/connect/storage/StringConverter.java     |  4 +++-
 .../java/org/apache/kafka/connect/json/JsonConverter.java     |  4 +++-
 .../org/apache/kafka/connect/converters/NumberConverter.java  |  3 +++
 .../kafka/connect/runtime/AbstractWorkerSourceTask.java       |  1 +
 .../java/org/apache/kafka/connect/runtime/WorkerSinkTask.java |  1 +
 .../apache/kafka/connect/runtime/ErrorHandlingTaskTest.java   | 11 +++++++++++
 .../connect/runtime/ExactlyOnceWorkerSourceTaskTest.java      |  1 +
 .../org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java  |  3 +++
 .../kafka/connect/runtime/WorkerSinkTaskThreadedTest.java     |  8 ++++++++
 .../apache/kafka/connect/runtime/WorkerSourceTaskTest.java    |  8 ++++++++
 10 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
index 534cdddfa1..69eda3459b 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.DataException;
@@ -104,6 +105,7 @@ public class StringConverter implements Converter, HeaderConverter {
 
     @Override
     public void close() {
-        // do nothing
+        Utils.closeQuietly(this.serializer, "string converter serializer");
+        Utils.closeQuietly(this.deserializer, "string converter deserializer");
     }
 }
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index 10fde8f20a..6a17ae277b 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.cache.LRUCache;
 import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
@@ -282,7 +283,8 @@ public class JsonConverter implements Converter, HeaderConverter {
 
     @Override
     public void close() {
-        // do nothing
+        Utils.closeQuietly(this.serializer, "JSON converter serializer");
+        Utils.closeQuietly(this.deserializer, "JSON converter deserializer");
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
index 4605b96f5b..c76486eb8b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.DataException;
@@ -122,5 +123,7 @@ abstract class NumberConverter<T extends Number> implements Converter, HeaderCon
 
     @Override
     public void close() {
+        Utils.closeQuietly(this.serializer, "number converter serializer");
+        Utils.closeQuietly(this.deserializer, "number converter deserializer");
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 407f5fd828..a2a3fce9fe 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -313,6 +313,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
         Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
         Utils.closeQuietly(offsetReader, "offset reader");
         Utils.closeQuietly(offsetStore::stop, "offset backing store");
+        Utils.closeQuietly(headerConverter, "header converter");
     }
 
     private void closeProducer(Duration duration) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 01303e308d..dfe815dffc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -176,6 +176,7 @@ class WorkerSinkTask extends WorkerTask {
         Utils.closeQuietly(consumer, "consumer");
         Utils.closeQuietly(transformationChain, "transformation chain");
         Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
+        Utils.closeQuietly(headerConverter, "header converter");
     }
 
     @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index d4184bbab2..b2ba417880 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -77,6 +77,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -244,6 +245,9 @@ public class ErrorHandlingTaskTest {
         consumer.close();
         EasyMock.expectLastCall();
 
+        headerConverter.close();
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerSinkTask.initialize(TASK_CONFIG);
@@ -541,6 +545,13 @@ public class ErrorHandlingTaskTest {
 
         offsetStore.stop();
         EasyMock.expectLastCall();
+
+        try {
+            headerConverter.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        EasyMock.expectLastCall();
     }
 
     private void expectTopicCreation(String topic) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index fdc253d04e..44427b5b54 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -1311,6 +1311,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends ThreadedTest {
         expectCall(() -> admin.close(EasyMock.anyObject(Duration.class)));
         expectCall(transformationChain::close);
         expectCall(offsetReader::close);
+        expectCall(headerConverter::close);
     }
 
     private void expectTopicCreation(String topic) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 41d4de5e0a..4aaf764966 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -348,6 +348,9 @@ public class WorkerSinkTaskTest {
         transformationChain.close();
         PowerMock.expectLastCall();
 
+        headerConverter.close();
+        PowerMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index eef6ca82be..cdd87e230d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -56,6 +56,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -552,6 +553,13 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
         consumer.close();
         PowerMock.expectLastCall();
+
+        try {
+            headerConverter.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        PowerMock.expectLastCall();
     }
 
     // Note that this can only be called once per test currently
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 2d2cd00cf5..0366677b17 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -68,6 +68,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 import org.powermock.reflect.Whitebox;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1091,6 +1092,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         offsetStore.stop();
         EasyMock.expectLastCall();
+
+        try {
+            headerConverter.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        EasyMock.expectLastCall();
     }
 
     private void expectTopicCreation(String topic) {