You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/12/07 22:00:39 UTC

[kafka] branch 3.4 updated: KAFKA-14443: Close topic creation Admin clients in MM2 connectors (#12955)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 4ab0c54a5d8 KAFKA-14443: Close topic creation Admin clients in MM2 connectors (#12955)
4ab0c54a5d8 is described below

commit 4ab0c54a5d872add78a0cb20720fd33fa5469c4d
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Wed Dec 7 13:59:46 2022 -0800

    KAFKA-14443: Close topic creation Admin clients in MM2 connectors (#12955)
    
    Reviewers: Omnia G H Ibrahim <o....@gmail.com>, Chris Egerton <ch...@aiven.io>
---
 .../org/apache/kafka/connect/mirror/MirrorClient.java  |  3 +--
 .../connect/mirror/MirrorCheckpointConnector.java      | 10 ++++++++--
 .../kafka/connect/mirror/MirrorHeartbeatConnector.java | 13 ++++++++++---
 .../kafka/connect/mirror/MirrorSourceConnector.java    | 18 +++++++++++-------
 4 files changed, 30 insertions(+), 14 deletions(-)

diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
index cd25429a449..b40b6ba552f 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.ForwardingAdmin;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -72,7 +71,7 @@ public class MirrorClient implements AutoCloseable {
     }
 
     // for testing
-    MirrorClient(ForwardingAdmin adminClient, ReplicationPolicy replicationPolicy,
+    MirrorClient(Admin adminClient, ReplicationPolicy replicationPolicy,
             Map<String, Object> consumerConfig) {
         this.adminClient = adminClient;
         this.replicationPolicy = replicationPolicy;
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
index 3885b452f39..47d7b5a9d86 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -48,6 +48,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
     private MirrorCheckpointConfig config;
     private GroupFilter groupFilter;
     private Admin sourceAdminClient;
+    private Admin targetAdminClient;
     private SourceAndTarget sourceAndTarget;
     private List<String> knownConsumerGroups = Collections.emptyList();
 
@@ -71,6 +72,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
         sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
         groupFilter = config.groupFilter();
         sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
+        targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
         scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout());
         scheduler.execute(this::createInternalTopics, "creating internal topics");
         scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
@@ -88,6 +90,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
         Utils.closeQuietly(scheduler, "scheduler");
         Utils.closeQuietly(groupFilter, "group filter");
         Utils.closeQuietly(sourceAdminClient, "source admin client");
+        Utils.closeQuietly(targetAdminClient, "target admin client");
     }
 
     @Override
@@ -159,8 +162,11 @@ public class MirrorCheckpointConnector extends SourceConnector {
     }
 
     private void createInternalTopics() {
-        MirrorUtils.createSinglePartitionCompactedTopic(config.checkpointsTopic(),
-            config.checkpointsTopicReplicationFactor(), config.forwardingAdmin(config.targetAdminConfig()));
+        MirrorUtils.createSinglePartitionCompactedTopic(
+                config.checkpointsTopic(),
+                config.checkpointsTopicReplicationFactor(),
+                targetAdminClient
+        );
     } 
 
     boolean shouldReplicate(String group) {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
index 07ee442d0de..0464f6f3952 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.common.config.ConfigDef;
@@ -33,7 +34,8 @@ import java.util.Collections;
 public class MirrorHeartbeatConnector extends SourceConnector {
     private MirrorHeartbeatConfig config;
     private Scheduler scheduler;
-    
+    private Admin targetAdminClient;
+
     public MirrorHeartbeatConnector() {
         // nop
     }
@@ -46,6 +48,7 @@ public class MirrorHeartbeatConnector extends SourceConnector {
     @Override
     public void start(Map<String, String> props) {
         config = new MirrorHeartbeatConfig(props);
+        targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
         scheduler = new Scheduler(MirrorHeartbeatConnector.class, config.adminTimeout());
         scheduler.execute(this::createInternalTopics, "creating internal topics");
     }
@@ -53,6 +56,7 @@ public class MirrorHeartbeatConnector extends SourceConnector {
     @Override
     public void stop() {
         Utils.closeQuietly(scheduler, "scheduler");
+        Utils.closeQuietly(targetAdminClient, "target admin client");
     }
 
     @Override
@@ -82,7 +86,10 @@ public class MirrorHeartbeatConnector extends SourceConnector {
     }
 
     private void createInternalTopics() {
-        MirrorUtils.createSinglePartitionCompactedTopic(config.heartbeatsTopic(),
-            config.heartbeatsTopicReplicationFactor(), config.forwardingAdmin(config.targetAdminConfig()));
+        MirrorUtils.createSinglePartitionCompactedTopic(
+                config.heartbeatsTopic(),
+                config.heartbeatsTopicReplicationFactor(),
+                targetAdminClient
+        );
     }
 }
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 8177e3e1cbf..b49daf0ff76 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -18,7 +18,7 @@ package org.apache.kafka.connect.mirror;
 
 import java.util.Map.Entry;
 
-import org.apache.kafka.clients.admin.ForwardingAdmin;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.common.config.ConfigDef;
@@ -80,8 +80,9 @@ public class MirrorSourceConnector extends SourceConnector {
     private List<TopicPartition> knownTargetTopicPartitions = Collections.emptyList();
     private ReplicationPolicy replicationPolicy;
     private int replicationFactor;
-    private ForwardingAdmin sourceAdminClient;
-    private ForwardingAdmin targetAdminClient;
+    private Admin sourceAdminClient;
+    private Admin targetAdminClient;
+    private Admin offsetSyncsAdminClient;
 
     public MirrorSourceConnector() {
         // nop
@@ -117,6 +118,7 @@ public class MirrorSourceConnector extends SourceConnector {
         replicationFactor = config.replicationFactor();
         sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
         targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
+        offsetSyncsAdminClient = config.forwardingAdmin(config.offsetSyncsTopicAdminConfig());
         scheduler = new Scheduler(MirrorSourceConnector.class, config.adminTimeout());
         scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
         scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions");
@@ -142,6 +144,7 @@ public class MirrorSourceConnector extends SourceConnector {
         Utils.closeQuietly(configPropertyFilter, "config property filter");
         Utils.closeQuietly(sourceAdminClient, "source admin client");
         Utils.closeQuietly(targetAdminClient, "target admin client");
+        Utils.closeQuietly(offsetSyncsAdminClient, "offset syncs admin client");
         log.info("Stopping {} took {} ms.", connectorName, System.currentTimeMillis() - start);
     }
 
@@ -306,9 +309,10 @@ public class MirrorSourceConnector extends SourceConnector {
     }
 
     private void createOffsetSyncsTopic() {
-        MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(),
+        MirrorUtils.createSinglePartitionCompactedTopic(
+                config.offsetSyncsTopic(),
                 config.offsetSyncsTopicReplicationFactor(),
-                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())
+                offsetSyncsAdminClient
         );
     }
 
@@ -393,7 +397,7 @@ public class MirrorSourceConnector extends SourceConnector {
         }));
     }
 
-    private Set<String> listTopics(ForwardingAdmin adminClient)
+    private Set<String> listTopics(Admin adminClient)
             throws InterruptedException, ExecutionException {
         return adminClient.listTopics().names().get();
     }
@@ -403,7 +407,7 @@ public class MirrorSourceConnector extends SourceConnector {
         return sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get();
     }
 
-    private static Collection<TopicDescription> describeTopics(ForwardingAdmin adminClient, Collection<String> topics)
+    private static Collection<TopicDescription> describeTopics(Admin adminClient, Collection<String> topics)
             throws InterruptedException, ExecutionException {
         return adminClient.describeTopics(topics).allTopicNames().get().values();
     }