You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/12/07 22:00:39 UTC
[kafka] branch 3.4 updated: KAFKA-14443: Close topic creation Admin clients in MM2 connectors (#12955)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 4ab0c54a5d8 KAFKA-14443: Close topic creation Admin clients in MM2 connectors (#12955)
4ab0c54a5d8 is described below
commit 4ab0c54a5d872add78a0cb20720fd33fa5469c4d
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Wed Dec 7 13:59:46 2022 -0800
KAFKA-14443: Close topic creation Admin clients in MM2 connectors (#12955)
Reviewers: Omnia G H Ibrahim <o....@gmail.com>, Chris Egerton <ch...@aiven.io>
---
.../org/apache/kafka/connect/mirror/MirrorClient.java | 3 +--
.../connect/mirror/MirrorCheckpointConnector.java | 10 ++++++++--
.../kafka/connect/mirror/MirrorHeartbeatConnector.java | 13 ++++++++++---
.../kafka/connect/mirror/MirrorSourceConnector.java | 18 +++++++++++-------
4 files changed, 30 insertions(+), 14 deletions(-)
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
index cd25429a449..b40b6ba552f 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
@@ -18,7 +18,6 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.ForwardingAdmin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -72,7 +71,7 @@ public class MirrorClient implements AutoCloseable {
}
// for testing
- MirrorClient(ForwardingAdmin adminClient, ReplicationPolicy replicationPolicy,
+ MirrorClient(Admin adminClient, ReplicationPolicy replicationPolicy,
Map<String, Object> consumerConfig) {
this.adminClient = adminClient;
this.replicationPolicy = replicationPolicy;
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
index 3885b452f39..47d7b5a9d86 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -48,6 +48,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
private MirrorCheckpointConfig config;
private GroupFilter groupFilter;
private Admin sourceAdminClient;
+ private Admin targetAdminClient;
private SourceAndTarget sourceAndTarget;
private List<String> knownConsumerGroups = Collections.emptyList();
@@ -71,6 +72,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
groupFilter = config.groupFilter();
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
+ targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout());
scheduler.execute(this::createInternalTopics, "creating internal topics");
scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
@@ -88,6 +90,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
Utils.closeQuietly(scheduler, "scheduler");
Utils.closeQuietly(groupFilter, "group filter");
Utils.closeQuietly(sourceAdminClient, "source admin client");
+ Utils.closeQuietly(targetAdminClient, "target admin client");
}
@Override
@@ -159,8 +162,11 @@ public class MirrorCheckpointConnector extends SourceConnector {
}
private void createInternalTopics() {
- MirrorUtils.createSinglePartitionCompactedTopic(config.checkpointsTopic(),
- config.checkpointsTopicReplicationFactor(), config.forwardingAdmin(config.targetAdminConfig()));
+ MirrorUtils.createSinglePartitionCompactedTopic(
+ config.checkpointsTopic(),
+ config.checkpointsTopicReplicationFactor(),
+ targetAdminClient
+ );
}
boolean shouldReplicate(String group) {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
index 07ee442d0de..0464f6f3952 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.mirror;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.config.ConfigDef;
@@ -33,7 +34,8 @@ import java.util.Collections;
public class MirrorHeartbeatConnector extends SourceConnector {
private MirrorHeartbeatConfig config;
private Scheduler scheduler;
-
+ private Admin targetAdminClient;
+
public MirrorHeartbeatConnector() {
// nop
}
@@ -46,6 +48,7 @@ public class MirrorHeartbeatConnector extends SourceConnector {
@Override
public void start(Map<String, String> props) {
config = new MirrorHeartbeatConfig(props);
+ targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
scheduler = new Scheduler(MirrorHeartbeatConnector.class, config.adminTimeout());
scheduler.execute(this::createInternalTopics, "creating internal topics");
}
@@ -53,6 +56,7 @@ public class MirrorHeartbeatConnector extends SourceConnector {
@Override
public void stop() {
Utils.closeQuietly(scheduler, "scheduler");
+ Utils.closeQuietly(targetAdminClient, "target admin client");
}
@Override
@@ -82,7 +86,10 @@ public class MirrorHeartbeatConnector extends SourceConnector {
}
private void createInternalTopics() {
- MirrorUtils.createSinglePartitionCompactedTopic(config.heartbeatsTopic(),
- config.heartbeatsTopicReplicationFactor(), config.forwardingAdmin(config.targetAdminConfig()));
+ MirrorUtils.createSinglePartitionCompactedTopic(
+ config.heartbeatsTopic(),
+ config.heartbeatsTopicReplicationFactor(),
+ targetAdminClient
+ );
}
}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 8177e3e1cbf..b49daf0ff76 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -18,7 +18,7 @@ package org.apache.kafka.connect.mirror;
import java.util.Map.Entry;
-import org.apache.kafka.clients.admin.ForwardingAdmin;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.config.ConfigDef;
@@ -80,8 +80,9 @@ public class MirrorSourceConnector extends SourceConnector {
private List<TopicPartition> knownTargetTopicPartitions = Collections.emptyList();
private ReplicationPolicy replicationPolicy;
private int replicationFactor;
- private ForwardingAdmin sourceAdminClient;
- private ForwardingAdmin targetAdminClient;
+ private Admin sourceAdminClient;
+ private Admin targetAdminClient;
+ private Admin offsetSyncsAdminClient;
public MirrorSourceConnector() {
// nop
@@ -117,6 +118,7 @@ public class MirrorSourceConnector extends SourceConnector {
replicationFactor = config.replicationFactor();
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig());
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig());
+ offsetSyncsAdminClient = config.forwardingAdmin(config.offsetSyncsTopicAdminConfig());
scheduler = new Scheduler(MirrorSourceConnector.class, config.adminTimeout());
scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions");
@@ -142,6 +144,7 @@ public class MirrorSourceConnector extends SourceConnector {
Utils.closeQuietly(configPropertyFilter, "config property filter");
Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
+ Utils.closeQuietly(offsetSyncsAdminClient, "offset syncs admin client");
log.info("Stopping {} took {} ms.", connectorName, System.currentTimeMillis() - start);
}
@@ -306,9 +309,10 @@ public class MirrorSourceConnector extends SourceConnector {
}
private void createOffsetSyncsTopic() {
- MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(),
+ MirrorUtils.createSinglePartitionCompactedTopic(
+ config.offsetSyncsTopic(),
config.offsetSyncsTopicReplicationFactor(),
- config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())
+ offsetSyncsAdminClient
);
}
@@ -393,7 +397,7 @@ public class MirrorSourceConnector extends SourceConnector {
}));
}
- private Set<String> listTopics(ForwardingAdmin adminClient)
+ private Set<String> listTopics(Admin adminClient)
throws InterruptedException, ExecutionException {
return adminClient.listTopics().names().get();
}
@@ -403,7 +407,7 @@ public class MirrorSourceConnector extends SourceConnector {
return sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get();
}
- private static Collection<TopicDescription> describeTopics(ForwardingAdmin adminClient, Collection<String> topics)
+ private static Collection<TopicDescription> describeTopics(Admin adminClient, Collection<String> topics)
throws InterruptedException, ExecutionException {
return adminClient.describeTopics(topics).allTopicNames().get().values();
}