You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/02/19 18:18:50 UTC

[kafka] branch 2.7 updated: KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 333bd7c  KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153)
333bd7c is described below

commit 333bd7ce9fefd22647f7f2729cd528894e3e8bd9
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Fri Feb 19 11:49:56 2021 -0600

    KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153)
    
    These Kafka*BackingStore classes used in Connect have a recently-added deprecated constructor, which is not used within AK. However, this commit corrects a AdminClient resource leak if those deprecated constructors are used outside of AK. The fix simply ensures that the AdminClient created by the “default” supplier is always closed when the Kafka*BackingStore is stopped.
    
    Author: Randall Hauch <rh...@gmail.com>
    Reviewers: Konstantine Karantasis <ko...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>
---
 .../connect/storage/KafkaConfigBackingStore.java      | 19 +++++++++++++++++--
 .../connect/storage/KafkaOffsetBackingStore.java      | 19 +++++++++++++++++--
 .../connect/storage/KafkaStatusBackingStore.java      | 19 +++++++++++++++++--
 3 files changed, 51 insertions(+), 6 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index d4e6358..dcfc28c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -44,6 +44,7 @@ import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -226,6 +227,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
     private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
     private final Supplier<TopicAdmin> topicAdminSupplier;
+    private SharedTopicAdmin ownTopicAdmin;
 
     // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
     // is in an inconsistent state and we cannot safely use them until they have been refreshed.
@@ -291,7 +293,13 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     @Override
     public void stop() {
         log.info("Closing KafkaConfigBackingStore");
-        configLog.stop();
+        try {
+            configLog.stop();
+        } finally {
+            if (ownTopicAdmin != null) {
+                ownTopicAdmin.close();
+            }
+        }
         log.info("Closed KafkaConfigBackingStore");
     }
 
@@ -479,7 +487,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
-        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
+        Supplier<TopicAdmin> adminSupplier;
+        if (topicAdminSupplier != null) {
+            adminSupplier = topicAdminSupplier;
+        } else {
+            // Create our own topic admin supplier that we'll close when we're stopped
+            ownTopicAdmin = new SharedTopicAdmin(adminProps);
+            adminSupplier = ownTopicAdmin;
+        }
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).configStorageTopicSettings()
                                             : Collections.emptyMap();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 26b47f9..313baf7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -32,6 +32,7 @@ import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConvertingFutureCallback;
 import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,6 +66,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
     private KafkaBasedLog<byte[], byte[]> offsetLog;
     private HashMap<ByteBuffer, ByteBuffer> data;
     private final Supplier<TopicAdmin> topicAdminSupplier;
+    private SharedTopicAdmin ownTopicAdmin;
 
     @Deprecated
     public KafkaOffsetBackingStore() {
@@ -98,7 +100,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
-        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
+        Supplier<TopicAdmin> adminSupplier;
+        if (topicAdminSupplier != null) {
+            adminSupplier = topicAdminSupplier;
+        } else {
+            // Create our own topic admin supplier that we'll close when we're stopped
+            ownTopicAdmin = new SharedTopicAdmin(adminProps);
+            adminSupplier = ownTopicAdmin;
+        }
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).offsetStorageTopicSettings()
                                             : Collections.emptyMap();
@@ -140,7 +149,13 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
     @Override
     public void stop() {
         log.info("Stopping KafkaOffsetBackingStore");
-        offsetLog.stop();
+        try {
+            offsetLog.stop();
+        } finally {
+            if (ownTopicAdmin != null) {
+                ownTopicAdmin.close();
+            }
+        }
         log.info("Stopped KafkaOffsetBackingStore");
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index abdbba8..bb73e46 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -44,6 +44,7 @@ import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.apache.kafka.connect.util.Table;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.slf4j.Logger;
@@ -134,6 +135,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
     private String statusTopic;
     private KafkaBasedLog<String, byte[]> kafkaLog;
     private int generation;
+    private SharedTopicAdmin ownTopicAdmin;
 
     @Deprecated
     public KafkaStatusBackingStore(Time time, Converter converter) {
@@ -177,7 +179,14 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
-        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
+        Supplier<TopicAdmin> adminSupplier;
+        if (topicAdminSupplier != null) {
+            adminSupplier = topicAdminSupplier;
+        } else {
+            // Create our own topic admin supplier that we'll close when we're stopped
+            ownTopicAdmin = new SharedTopicAdmin(adminProps);
+            adminSupplier = ownTopicAdmin;
+        }
 
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).statusStorageTopicSettings()
@@ -226,7 +235,13 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
 
     @Override
     public void stop() {
-        kafkaLog.stop();
+        try {
+            kafkaLog.stop();
+        } finally {
+            if (ownTopicAdmin != null) {
+                ownTopicAdmin.close();
+            }
+        }
     }
 
     @Override