You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/02/10 01:59:16 UTC

[kafka] branch 2.6 updated: Revert "KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics (#9780)"

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new a1b9e2b  Revert "KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics (#9780)"
a1b9e2b is described below

commit a1b9e2baba14ec70b300f30a7649691700097f19
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Tue Feb 9 19:56:21 2021 -0600

    Revert "KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics (#9780)"
    
    This reverts commit 36ebed01bb5c104e7b07e384cdab5f480e811d2c.
---
 .../apache/kafka/connect/mirror/MirrorMaker.java   |  17 +-
 .../kafka/connect/cli/ConnectDistributed.java      |  18 +-
 .../runtime/distributed/DistributedHerder.java     |  37 +----
 .../connect/storage/KafkaConfigBackingStore.java   |  38 ++---
 .../connect/storage/KafkaOffsetBackingStore.java   |  42 ++---
 .../connect/storage/KafkaStatusBackingStore.java   |  38 ++---
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  77 +--------
 .../kafka/connect/util/SharedTopicAdmin.java       | 145 ----------------
 .../org/apache/kafka/connect/util/TopicAdmin.java  |  67 --------
 .../runtime/distributed/DistributedHerderTest.java |  13 +-
 .../storage/KafkaConfigBackingStoreTest.java       |   6 +-
 .../storage/KafkaOffsetBackingStoreTest.java       |   6 +-
 .../kafka/connect/util/SharedTopicAdminTest.java   | 112 -------------
 .../apache/kafka/connect/util/TopicAdminTest.java  | 183 ---------------------
 14 files changed, 75 insertions(+), 724 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index 839f5dc..fa73f8d 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -36,7 +36,6 @@ import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 
-import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -234,28 +233,20 @@ public class MirrorMaker {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new DistributedConfig(workerProps);
         String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
-        // Create the admin client to be shared by all backing stores for this herder
-        Map<String, Object> adminProps = new HashMap<>(config.originals());
-        ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId);
-        SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
         offsetBackingStore.configure(distributedConfig);
         Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
         statusBackingStore.configure(distributedConfig);
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 distributedConfig,
-                configTransformer,
-                sharedAdmin);
-        // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
-        // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
-        // tracking the various shared admin objects in this class.
+                configTransformer);
         Herder herder = new DistributedHerder(distributedConfig, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
+                advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY);
         herders.put(sourceAndTarget, herder);
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 8d93e79..22c1ad8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -36,14 +36,12 @@ import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.ConnectUtils;
-import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -103,12 +101,7 @@ public class ConnectDistributed {
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        // Create the admin client to be shared by all backing stores.
-        Map<String, Object> adminProps = new HashMap<>(config.originals());
-        ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId);
-        SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
-
-        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
         offsetBackingStore.configure(config);
 
         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
@@ -119,20 +112,17 @@ public class ConnectDistributed {
         WorkerConfigTransformer configTransformer = worker.configTransformer();
 
         Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
         statusBackingStore.configure(config);
 
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                 internalValueConverter,
                 config,
-                configTransformer,
-                sharedAdmin);
+                configTransformer);
 
-        // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
-        // herder is stopped. This is easier than having to track and own the lifecycle ourselves.
         DistributedHerder herder = new DistributedHerder(config, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);
+                advertisedUrl.toString(), connectorClientConfigOverridePolicy);
 
         final Connect connect = new Connect(herder, rest);
         log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index b6dab25..4879a1a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
@@ -68,7 +67,6 @@ import javax.crypto.KeyGenerator;
 import javax.crypto.SecretKey;
 import javax.ws.rs.core.Response;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -141,7 +139,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
     private final Time time;
     private final HerderMetrics herderMetrics;
-    private final List<AutoCloseable> uponShutdown;
 
     private final String workerGroupId;
     private final int workerSyncTimeoutMs;
@@ -189,22 +186,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
     private final DistributedConfig config;
 
-    /**
-     * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs)
-     * that have the same group ID.
-     *
-     * @param config             the configuration for the worker; may not be null
-     * @param time               the clock to use; may not be null
-     * @param worker             the {@link Worker} instance to use; may not be null
-     * @param kafkaClusterId     the identifier of the Kafka cluster to use for internal topics; may not be null
-     * @param statusBackingStore the backing store for statuses; may not be null
-     * @param configBackingStore the backing store for connector configurations; may not be null
-     * @param restUrl            the URL of this herder's REST API; may not be null
-     * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
-     *                                            in connector configurations; may not be null
-     * @param uponShutdown       any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
-     *                           after all services and resources owned by this herder are stopped
-     */
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
@@ -212,10 +193,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl,
-                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
-                             AutoCloseable... uponShutdown) {
+                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
         this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(),
-             time, connectorClientConfigOverridePolicy, uponShutdown);
+             time, connectorClientConfigOverridePolicy);
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
@@ -230,8 +210,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                       String restUrl,
                       ConnectMetrics metrics,
                       Time time,
-                      ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
-                      AutoCloseable... uponShutdown) {
+                      ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
         super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
 
         this.time = time;
@@ -245,7 +224,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         this.keySignatureVerificationAlgorithms = config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
         this.keyGenerator = config.getInternalRequestKeyGenerator();
         this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
-        this.uponShutdown = Arrays.asList(uponShutdown);
 
         String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
         String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
@@ -700,15 +678,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     @Override
-    protected void stopServices() {
-        try {
-            super.stopServices();
-        } finally {
-            this.uponShutdown.forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "<unknown>"));
-        }
-    }
-
-    @Override
     public void stop() {
         log.info("Herder stopping");
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index d4e6358..fbcc35b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -62,7 +62,6 @@ import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
 
 /**
  * <p>
@@ -225,7 +224,6 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     // Connector and task configs: name or id -> config map
     private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
     private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
-    private final Supplier<TopicAdmin> topicAdminSupplier;
 
     // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
     // is in an inconsistent state and we cannot safely use them until they have been refreshed.
@@ -243,17 +241,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
     private final WorkerConfigTransformer configTransformer;
 
-    @Deprecated
     public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) {
-        this(converter, config, configTransformer, null);
-    }
-
-    public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier) {
         this.lock = new Object();
         this.started = false;
         this.converter = converter;
         this.offset = -1;
-        this.topicAdminSupplier = adminSupplier;
 
         this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
         if (this.topic == null || this.topic.trim().length() == 0)
@@ -479,7 +471,6 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
-        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).configStorageTopicSettings()
                                             : Collections.emptyMap();
@@ -490,25 +481,30 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                 .replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG))
                 .build();
 
-        return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
+        return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
     }
 
     private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                               Map<String, Object> consumerProps,
                                                               Callback<ConsumerRecord<String, byte[]>> consumedCallback,
-                                                              final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
-        java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
-            log.debug("Creating admin client to manage Connect internal config topic");
-            // Create the topic if it doesn't exist
-            Set<String> newTopics = admin.createTopics(topicDescription);
-            if (!newTopics.contains(topic)) {
-                // It already existed, so check that the topic cleanup policy is compact only and not delete
-                log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
-                admin.verifyTopicCleanupPolicyOnlyCompact(topic,
-                        DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
+                                                              final NewTopic topicDescription, final Map<String, Object> adminProps) {
+        Runnable createTopics = new Runnable() {
+            @Override
+            public void run() {
+                log.debug("Creating admin client to manage Connect internal config topic");
+                try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = admin.createTopics(topicDescription);
+                    if (!newTopics.contains(topic)) {
+                        // It already existed, so check that the topic cleanup policy is compact only and not delete
+                        log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                        admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                                DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
+                    }
+                }
             }
         };
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 26b47f9..8408f99 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -41,13 +41,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
 
 /**
  * <p>
@@ -64,16 +62,6 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
 
     private KafkaBasedLog<byte[], byte[]> offsetLog;
     private HashMap<ByteBuffer, ByteBuffer> data;
-    private final Supplier<TopicAdmin> topicAdminSupplier;
-
-    @Deprecated
-    public KafkaOffsetBackingStore() {
-        this.topicAdminSupplier = null;
-    }
-
-    public KafkaOffsetBackingStore(Supplier<TopicAdmin> topicAdmin) {
-        this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
-    }
 
     @Override
     public void configure(final WorkerConfig config) {
@@ -98,7 +86,6 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
-        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).offsetStorageTopicSettings()
                                             : Collections.emptyMap();
@@ -109,25 +96,30 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
                 .replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG))
                 .build();
 
-        offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier);
+        offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps);
     }
 
     private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                               Map<String, Object> consumerProps,
                                                               Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
-                                                              final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
-        java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
-            log.debug("Creating admin client to manage Connect internal offset topic");
-            // Create the topic if it doesn't exist
-            Set<String> newTopics = admin.createTopics(topicDescription);
-            if (!newTopics.contains(topic)) {
-                // It already existed, so check that the topic cleanup policy is compact only and not delete
-                log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
-                admin.verifyTopicCleanupPolicyOnlyCompact(topic,
-                        DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
+                                                              final NewTopic topicDescription, final Map<String, Object> adminProps) {
+        Runnable createTopics = new Runnable() {
+            @Override
+            public void run() {
+                log.debug("Creating admin client to manage Connect internal offset topic");
+                try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = admin.createTopics(topicDescription);
+                    if (!newTopics.contains(topic)) {
+                        // It already existed, so check that the topic cleanup policy is compact only and not delete
+                        log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                        admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                                DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
+                    }
+                }
             }
         };
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index abdbba8..5d6057d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -61,7 +61,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.function.Supplier;
 
 /**
  * StatusBackingStore implementation which uses a compacted topic for storage
@@ -129,24 +128,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
     protected final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
     protected final Map<String, CacheEntry<ConnectorStatus>> connectors;
     protected final ConcurrentMap<String, ConcurrentMap<String, TopicStatus>> topics;
-    private final Supplier<TopicAdmin> topicAdminSupplier;
 
     private String statusTopic;
     private KafkaBasedLog<String, byte[]> kafkaLog;
     private int generation;
 
-    @Deprecated
     public KafkaStatusBackingStore(Time time, Converter converter) {
-        this(time, converter, null);
-    }
-
-    public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdmin> topicAdminSupplier) {
         this.time = time;
         this.converter = converter;
         this.tasks = new Table<>();
         this.connectors = new HashMap<>();
         this.topics = new ConcurrentHashMap<>();
-        this.topicAdminSupplier = topicAdminSupplier;
     }
 
     // visible for testing
@@ -177,7 +169,6 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
-        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
 
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).statusStorageTopicSettings()
@@ -195,25 +186,30 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
                 read(record);
             }
         };
-        this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier);
+        this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminProps);
     }
 
     private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                               Map<String, Object> consumerProps,
                                                               Callback<ConsumerRecord<String, byte[]>> consumedCallback,
-                                                              final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
-        java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
-            log.debug("Creating admin client to manage Connect internal status topic");
-            // Create the topic if it doesn't exist
-            Set<String> newTopics = admin.createTopics(topicDescription);
-            if (!newTopics.contains(topic)) {
-                // It already existed, so check that the topic cleanup policy is compact only and not delete
-                log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
-                admin.verifyTopicCleanupPolicyOnlyCompact(topic,
-                        DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses");
+                                                              final NewTopic topicDescription, final Map<String, Object> adminProps) {
+        Runnable createTopics = new Runnable() {
+            @Override
+            public void run() {
+                log.debug("Creating admin client to manage Connect internal status topic");
+                try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = admin.createTopics(topicDescription);
+                    if (!newTopics.contains(topic)) {
+                        // It already existed, so check that the topic cleanup policy is compact only and not delete
+                        log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                        admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                                DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses");
+                    }
+                }
             }
         };
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, time, createTopics);
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, time, createTopics);
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 6a2a787..5248715 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -41,12 +41,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 
 
 /**
@@ -81,15 +79,13 @@ public class KafkaBasedLog<K, V> {
     private final Map<String, Object> producerConfigs;
     private final Map<String, Object> consumerConfigs;
     private final Callback<ConsumerRecord<K, V>> consumedCallback;
-    private final Supplier<TopicAdmin> topicAdminSupplier;
     private Consumer<K, V> consumer;
     private Producer<K, V> producer;
-    private TopicAdmin admin;
 
     private Thread thread;
     private boolean stopRequested;
     private Queue<Callback<Void>> readLogEndOffsetCallbacks;
-    private java.util.function.Consumer<TopicAdmin> initializer;
+    private Runnable initializer;
 
     /**
      * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
@@ -107,63 +103,31 @@ public class KafkaBasedLog<K, V> {
      * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
      * @param time Time interface
      * @param initializer the component that should be run when this log is {@link #start() started}; may be null
-     * @deprecated Replaced by {@link #KafkaBasedLog(String, Map, Map, Supplier, Callback, Time, java.util.function.Consumer)}
      */
-    @Deprecated
     public KafkaBasedLog(String topic,
                          Map<String, Object> producerConfigs,
                          Map<String, Object> consumerConfigs,
                          Callback<ConsumerRecord<K, V>> consumedCallback,
                          Time time,
                          Runnable initializer) {
-        this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null);
-    }
-
-    /**
-     * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
-     * {@link #start()} is invoked.
-     *
-     * @param topic              the topic to treat as a log
-     * @param producerConfigs    configuration options to use when creating the internal producer. At a minimum this must
-     *                           contain compatible serializer settings for the generic types used on this class. Some
-     *                           setting, such as the number of acks, will be overridden to ensure correct behavior of this
-     *                           class.
-     * @param consumerConfigs    configuration options to use when creating the internal consumer. At a minimum this must
-     *                           contain compatible serializer settings for the generic types used on this class. Some
-     *                           setting, such as the auto offset reset policy, will be overridden to ensure correct
-     *                           behavior of this class.
-     * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled
-     *                           by the calling component; may not be null
-     * @param consumedCallback   callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
-     * @param time               Time interface
-     * @param initializer        the function that should be run when this log is {@link #start() started}; may be null
-     */
-    public KafkaBasedLog(String topic,
-            Map<String, Object> producerConfigs,
-            Map<String, Object> consumerConfigs,
-            Supplier<TopicAdmin> topicAdminSupplier,
-            Callback<ConsumerRecord<K, V>> consumedCallback,
-            Time time,
-            java.util.function.Consumer<TopicAdmin> initializer) {
         this.topic = topic;
         this.producerConfigs = producerConfigs;
         this.consumerConfigs = consumerConfigs;
-        this.topicAdminSupplier = Objects.requireNonNull(topicAdminSupplier);
         this.consumedCallback = consumedCallback;
         this.stopRequested = false;
         this.readLogEndOffsetCallbacks = new ArrayDeque<>();
         this.time = time;
-        this.initializer = initializer != null ? initializer : admin -> { };
+        this.initializer = initializer != null ? initializer : new Runnable() {
+            @Override
+            public void run() {
+            }
+        };
     }
 
     public void start() {
         log.info("Starting KafkaBasedLog with topic " + topic);
 
-        // Create the topic admin client and initialize the topic ...
-        admin = topicAdminSupplier.get();   // may be null
-        initializer.accept(admin);
-
-        // Then create the producer and consumer
+        initializer.run();
         producer = createProducer();
         consumer = createConsumer();
 
@@ -229,9 +193,6 @@ public class KafkaBasedLog<K, V> {
             log.error("Failed to stop KafkaBasedLog consumer", e);
         }
 
-        // do not close the admin client, since we don't own it
-        admin = null;
-
         log.info("Stopped KafkaBasedLog for topic " + topic);
     }
 
@@ -321,29 +282,7 @@ public class KafkaBasedLog<K, V> {
         log.trace("Reading to end of offset log");
 
         Set<TopicPartition> assignment = consumer.assignment();
-        Map<TopicPartition, Long> endOffsets;
-        // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions.
-        // That is because it's possible that the consumer is already blocked waiting for new records to appear, when
-        // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least
-        // one more record becomes available, meaning we can't even check whether we're at the end offset.
-        // Since all we're trying to do here is get the end offset, we should use the supplied admin client
-        // (if available)
-        // (which prevents 'consumer.endOffsets(...)'
-        // from
-
-        // Deprecated constructors do not provide an admin supplier, so the admin is potentially null.
-        if (admin != null) {
-            // Use the admin client to immediately find the end offsets for the assigned topic partitions.
-            // Unlike using the consumer
-            endOffsets = admin.endOffsets(assignment);
-        } else {
-            // The admin may be null if older deprecated constructor is used, though AK Connect currently always provides an admin client.
-            // Using the consumer is not ideal, because when the topic has low volume, the 'poll(...)' method called from the
-            // work thread may have blocked the consumer while waiting for more records (even when there are none).
-            // In such cases, this call to the consumer to simply find the end offsets will block even though we might already be
-            // at the end offset.
-            endOffsets = consumer.endOffsets(assignment);
-        }
+        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
         log.trace("Reading to end of log offsets {}", endOffsets);
 
         while (!endOffsets.isEmpty()) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
deleted file mode 100644
index a99514e..0000000
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.function.UnaryOperator;
-
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.connect.errors.ConnectException;
-
-/**
- * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers.
- * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin}
- * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}.
- *
- * <p>The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)}
- * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this
- * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods,
- * nor any previously returned {@link TopicAdmin} instances may be used.
- *
- * <p>This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object,
- * until this object is closed, at which point it cannot be used anymore
- */
-public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
-
-    // Visible for testing
-    static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
-
-    private final Map<String, Object> adminProps;
-    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-    private final Function<Map<String, Object>, TopicAdmin> factory;
-
-    public SharedTopicAdmin(Map<String, Object> adminProps) {
-        this(adminProps, TopicAdmin::new);
-    }
-
-    // Visible for testing
-    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, Object>, TopicAdmin> factory) {
-        this.adminProps = Objects.requireNonNull(adminProps);
-        this.factory = Objects.requireNonNull(factory);
-    }
-
-    /**
-     * Get the shared {@link TopicAdmin} instance.
-     *
-     * @return the shared instance; never null
-     * @throws ConnectException if this object has already been closed
-     */
-    @Override
-    public TopicAdmin get() {
-        return topicAdmin();
-    }
-
-    /**
-     * Get the shared {@link TopicAdmin} instance.
-     *
-     * @return the shared instance; never null
-     * @throws ConnectException if this object has already been closed
-     */
-    public TopicAdmin topicAdmin() {
-        return admin.updateAndGet(this::createAdmin);
-    }
-
-    /**
-     * Get the string containing the list of bootstrap server addresses to the Kafka broker(s) to which
-     * the admin client connects.
-     *
-     * @return the bootstrap servers as a string; never null
-     */
-    public String bootstrapServers() {
-        return adminProps.getOrDefault(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "<unknown>").toString();
-    }
-
-    /**
-     * Close the underlying {@link TopicAdmin} instance, if one has been created, and prevent new ones from being created.
-     *
-     * <p>Once this method is called, the {@link #get()} and {@link #topicAdmin()} methods,
-     * nor any previously returned {@link TopicAdmin} instances may be used.
-     */
-    @Override
-    public void close() {
-        close(DEFAULT_CLOSE_DURATION);
-    }
-
-    /**
-     * Close the underlying {@link TopicAdmin} instance, if one has been created, and prevent new ones from being created.
-     *
-     * <p>Once this method is called, the {@link #get()} and {@link #topicAdmin()} methods,
-     * nor any previously returned {@link TopicAdmin} instances may be used.
-     *
-     * @param timeout the maximum time to wait while the underlying admin client is closed; may not be null
-     */
-    public void close(Duration timeout) {
-        Objects.requireNonNull(timeout);
-        if (this.closed.compareAndSet(false, true)) {
-            TopicAdmin admin = this.admin.getAndSet(null);
-            if (admin != null) {
-                admin.close(timeout);
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "admin client for brokers at " + bootstrapServers();
-    }
-
-    /**
-     * Method used to create a {@link TopicAdmin} instance. This method must be side-effect free, since it is called from within
-     * the {@link AtomicReference#updateAndGet(UnaryOperator)}.
-     *
-     * @param existing the existing instance; may be null
-     * @return the
-     */
-    protected TopicAdmin createAdmin(TopicAdmin existing) {
-        if (closed.get()) {
-            throw new ConnectException("The " + this + " has already been closed and cannot be used.");
-        }
-        if (existing != null) {
-            return existing;
-        }
-        return factory.apply(adminProps);
-    }
-}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 8b7e456..428343b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -23,20 +23,14 @@ import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import org.apache.kafka.clients.admin.DescribeConfigsOptions;
 import org.apache.kafka.clients.admin.DescribeTopicsOptions;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicExistsException;
@@ -59,7 +53,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -290,14 +283,6 @@ public class TopicAdmin implements AutoCloseable {
     }
 
     /**
-     * Get the {@link Admin} client used by this topic admin object.
-     * @return the Kafka admin instance; never null
-     */
-    public Admin admin() {
-        return admin;
-    }
-
-   /**
      * Attempt to create the topic described by the given definition, returning true if the topic was created or false
      * if the topic already existed.
      *
@@ -645,58 +630,6 @@ public class TopicAdmin implements AutoCloseable {
         return result;
     }
 
-    /**
-     * Fetch the most recent offset for each of the supplied {@link TopicPartition} objects.
-     *
-     * @param partitions the topic partitions
-     * @return the map of offset for each topic partition, or an empty map if the supplied partitions
-     *         are null or empty
-     * @throws RetriableException if a retriable error occurs, the operation takes too long, or the
-     *         thread is interrupted while attempting to perform this operation
-     * @throws ConnectException if a non retriable error occurs
-     */
-    public Map<TopicPartition, Long> endOffsets(Set<TopicPartition> partitions) {
-        if (partitions == null || partitions.isEmpty()) {
-            return Collections.emptyMap();
-        }
-        Map<TopicPartition, OffsetSpec> offsetSpecMap = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()));
-        ListOffsetsResult resultFuture = admin.listOffsets(offsetSpecMap);
-        // Get the individual result for each topic partition so we have better error messages
-        Map<TopicPartition, Long> result = new HashMap<>();
-        for (TopicPartition partition : partitions) {
-            try {
-                ListOffsetsResultInfo info = resultFuture.partitionResult(partition).get();
-                result.put(partition, info.offset());
-            } catch (ExecutionException e) {
-                Throwable cause = e.getCause();
-                String topic = partition.topic();
-                if (cause instanceof AuthorizationException) {
-                    String msg = String.format("Not authorized to get the end offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
-                    throw new ConnectException(msg, e);
-                } else if (cause instanceof UnsupportedVersionException) {
-                    // Should theoretically never happen, because this method is the same as what the consumer uses and therefore
-                    // should exist in the broker since before the admin client was added
-                    String msg = String.format("API to get the get the end offsets for topic '%s' is unsupported on brokers at %s", topic, bootstrapServers());
-                    throw new ConnectException(msg, e);
-                } else if (cause instanceof TimeoutException) {
-                    String msg = String.format("Timed out while waiting to get end offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
-                    throw new RetriableException(msg, e);
-                } else if (cause instanceof LeaderNotAvailableException) {
-                    String msg = String.format("Unable to get end offsets during leader election for topic '%s' on brokers at %s", topic, bootstrapServers());
-                    throw new RetriableException(msg, e);
-                } else {
-                    String msg = String.format("Error while getting end offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
-                    throw new ConnectException(msg, e);
-                }
-            } catch (InterruptedException e) {
-                Thread.interrupted();
-                String msg = String.format("Interrupted while attempting to read end offsets for topic '%s' on brokers at %s", partition.topic(), bootstrapServers());
-                throw new RetriableException(msg, e);
-            }
-        }
-        return result;
-    }
-
     @Override
     public void close() {
         admin.close();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index bed46a3..bea062e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -77,7 +77,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -192,7 +191,6 @@ public class DistributedHerderTest {
     @Mock private Plugins plugins;
     @Mock private PluginClassLoader pluginLoader;
     @Mock private DelegatingClassLoader delegatingLoader;
-    private CountDownLatch shutdownCalled = new CountDownLatch(1);
 
     private ConfigBackingStore.UpdateListener configUpdateListener;
     private WorkerRebalanceListener rebalanceListener;
@@ -210,7 +208,6 @@ public class DistributedHerderTest {
         metrics = new MockConnectMetrics(time);
         worker = PowerMock.createMock(Worker.class);
         EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
-        AutoCloseable uponShutdown = () -> shutdownCalled.countDown();
 
         // Default to the old protocol unless specified otherwise
         connectProtocolVersion = CONNECT_PROTOCOL_V0;
@@ -218,8 +215,7 @@ public class DistributedHerderTest {
         herder = PowerMock.createPartialMock(DistributedHerder.class,
                 new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
-                statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy,
-                new AutoCloseable[]{uponShutdown});
+                statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener(time);
@@ -2309,13 +2305,6 @@ public class DistributedHerderTest {
                 getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor"));
     }
 
-    @Test
-    public void testHerderStopServicesClosesUponShutdown() {
-        assertEquals(1, shutdownCalled.getCount());
-        herder.stopServices();
-        assertEquals(0, shutdownCalled.getCount());
-    }
-
     private void expectRebalance(final long offset,
                                  final List<String> assignedConnectors,
                                  final List<ConnectorTaskId> assignedTasks) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 7d301ca..a46f6fe 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TestFuture;
-import org.apache.kafka.connect.util.TopicAdmin;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
@@ -58,7 +57,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -144,7 +142,7 @@ public class KafkaConfigBackingStoreTest {
     private Capture<String> capturedTopic = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
-    private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedAdminProps = EasyMock.newCapture();
     private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
     private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
 
@@ -897,7 +895,7 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
                 EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
                 EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback),
-                EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier))
+                EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminProps))
                 .andReturn(storeLog);
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index ebcf208..0d0194f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
-import org.apache.kafka.connect.util.TopicAdmin;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
@@ -51,7 +50,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -108,7 +106,7 @@ public class KafkaOffsetBackingStoreTest {
     private Capture<String> capturedTopic = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
-    private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedAdminProps = EasyMock.newCapture();
     private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
     private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
 
@@ -392,7 +390,7 @@ public class KafkaOffsetBackingStoreTest {
     private void expectConfigure() throws Exception {
         PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
                 EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback),
-                EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier))
+                EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminProps))
                 .andReturn(storeLog);
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
deleted file mode 100644
index f5ac6a7..0000000
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Map;
-import java.util.function.Function;
-
-import org.apache.kafka.connect.errors.ConnectException;
-import org.junit.Rule;
-import org.mockito.Mock;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-
-import static org.apache.kafka.connect.util.SharedTopicAdmin.DEFAULT_CLOSE_DURATION;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThrows;
-import static org.mockito.ArgumentMatchers.anyMap;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class SharedTopicAdminTest {
-
-    private static final Map<String, Object> EMPTY_CONFIG = Collections.emptyMap();
-
-    @Rule
-    public MockitoRule rule = MockitoJUnit.rule();
-
-    @Mock private TopicAdmin mockTopicAdmin;
-    @Mock private Function<Map<String, Object>, TopicAdmin> factory;
-    private SharedTopicAdmin sharedAdmin;
-
-    @Before
-    public void beforeEach() {
-        when(factory.apply(anyMap())).thenReturn(mockTopicAdmin);
-        sharedAdmin = new SharedTopicAdmin(EMPTY_CONFIG, factory::apply);
-    }
-
-    @Test
-    public void shouldCloseWithoutBeingUsed() {
-        // When closed before being used
-        sharedAdmin.close();
-        // Then should not create or close admin
-        verifyTopicAdminCreatesAndCloses(0);
-    }
-
-    @Test
-    public void shouldCloseAfterTopicAdminUsed() {
-        // When used and then closed
-        assertSame(mockTopicAdmin, sharedAdmin.topicAdmin());
-        sharedAdmin.close();
-        // Then should have created and closed just one admin
-        verifyTopicAdminCreatesAndCloses(1);
-    }
-
-    @Test
-    public void shouldCloseAfterTopicAdminUsedMultipleTimes() {
-        // When used many times and then closed
-        for (int i = 0; i != 10; ++i) {
-            assertSame(mockTopicAdmin, sharedAdmin.topicAdmin());
-        }
-        sharedAdmin.close();
-        // Then should have created and closed just one admin
-        verifyTopicAdminCreatesAndCloses(1);
-    }
-
-    @Test
-    public void shouldCloseWithDurationAfterTopicAdminUsed() {
-        // When used and then closed with a custom timeout
-        Duration timeout = Duration.ofSeconds(1);
-        assertSame(mockTopicAdmin, sharedAdmin.topicAdmin());
-        sharedAdmin.close(timeout);
-        // Then should have created and closed just one admin using the supplied timeout
-        verifyTopicAdminCreatesAndCloses(1, timeout);
-    }
-
-    @Test
-    public void shouldFailToGetTopicAdminAfterClose() {
-        // When closed
-        sharedAdmin.close();
-        // Then using the admin should fail
-        assertThrows(ConnectException.class, () -> sharedAdmin.topicAdmin());
-    }
-
-    private void verifyTopicAdminCreatesAndCloses(int count) {
-        verifyTopicAdminCreatesAndCloses(count, DEFAULT_CLOSE_DURATION);
-    }
-
-    private void verifyTopicAdminCreatesAndCloses(int count, Duration expectedDuration) {
-        verify(factory, times(count)).apply(anyMap());
-        verify(mockTopicAdmin, times(count)).close(eq(expectedDuration));
-    }
-}
\ No newline at end of file
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index dbe2910..e4d069d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -17,12 +17,9 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
@@ -30,14 +27,11 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
@@ -53,13 +47,10 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
@@ -71,11 +62,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyMap;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class TopicAdminTest {
 
@@ -474,175 +460,6 @@ public class TopicAdminTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private <T> KafkaFuture<T> mockFuture() {
-        return (KafkaFuture<T>) mock(KafkaFuture.class);
-    }
-
-    private Admin expectAdminListOffsetsFailure(Throwable expected) throws Exception {
-        // When the admin client lists offsets
-        Admin mockAdmin = mock(Admin.class);
-        ListOffsetsResult results = mock(ListOffsetsResult.class);
-        when(mockAdmin.listOffsets(anyMap())).thenReturn(results);
-        // and throws an exception via the future.get()
-        ExecutionException execException = new ExecutionException(expected);
-        KafkaFuture<ListOffsetsResultInfo> future = mockFuture();
-        when(future.get()).thenThrow(execException);
-        when(results.partitionResult(any(TopicPartition.class))).thenReturn(future);
-        return mockAdmin;
-    }
-
-    private void expectOffsets(ListOffsetsResult results, TopicPartition tp, long offset) throws Exception {
-        // Then return the offsets for tp2
-        ListOffsetsResultInfo resultsInfo = new ListOffsetsResultInfo(offset, 0L, Optional.empty());
-        KafkaFuture<ListOffsetsResultInfo> future = mockFuture();
-        when(future.get()).thenReturn(resultsInfo);
-        when(results.partitionResult(eq(tp))).thenReturn(future);
-
-    }
-
-    @Test
-    public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() throws Exception {
-        String topicName = "myTopic";
-        TopicPartition tp1 = new TopicPartition(topicName, 0);
-        Set<TopicPartition> tps = Collections.singleton(tp1);
-
-        // When the admin client lists offsets throws an exception
-        Admin mockAdmin = expectAdminListOffsetsFailure(new AuthorizationException("failed"));
-
-        // Then the topic admin should throw exception
-        TopicAdmin admin = new TopicAdmin(null, mockAdmin);
-        ConnectException e = assertThrows(ConnectException.class, () -> {
-            admin.endOffsets(tps);
-        });
-        assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
-    }
-
-    @Test
-    public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() throws Exception {
-        String topicName = "myTopic";
-        TopicPartition tp1 = new TopicPartition(topicName, 0);
-        Set<TopicPartition> tps = Collections.singleton(tp1);
-
-        // When the admin client lists offsets throws an exception
-        Admin mockAdmin = expectAdminListOffsetsFailure(new UnsupportedVersionException("failed"));
-
-        // Then the topic admin should throw exception
-        TopicAdmin admin = new TopicAdmin(null, mockAdmin);
-        ConnectException e = assertThrows(ConnectException.class, () -> {
-            admin.endOffsets(tps);
-        });
-        assertTrue(e.getMessage().contains("is unsupported on brokers"));
-    }
-
-    @Test
-    public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() throws Exception {
-        String topicName = "myTopic";
-        TopicPartition tp1 = new TopicPartition(topicName, 0);
-        Set<TopicPartition> tps = Collections.singleton(tp1);
-
-        // When the admin client lists offsets throws an exception
-        Admin mockAdmin = expectAdminListOffsetsFailure(new TimeoutException("failed"));
-
-        // Then the topic admin should throw exception
-        TopicAdmin admin = new TopicAdmin(null, mockAdmin);
-        ConnectException e = assertThrows(ConnectException.class, () -> {
-            admin.endOffsets(tps);
-        });
-        assertTrue(e.getMessage().contains("Timed out while waiting"));
-    }
-
-    @Test
-    public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() throws Exception {
-        String topicName = "myTopic";
-        TopicPartition tp1 = new TopicPartition(topicName, 0);
-        Set<TopicPartition> tps = Collections.singleton(tp1);
-
-        // When the admin client lists offsets throws an exception
-        Admin mockAdmin = expectAdminListOffsetsFailure(new RuntimeException("failed"));
-
-        // Then the topic admin should throw exception
-        TopicAdmin admin = new TopicAdmin(null, mockAdmin);
-        ConnectException e = assertThrows(ConnectException.class, () -> {
-            admin.endOffsets(tps);
-        });
-        assertTrue(e.getMessage().contains("Error while getting end offsets for topic"));
-    }
-
-    @Test
-    public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() throws Exception {
-        String topicName = "myTopic";
-        TopicPartition tp1 = new TopicPartition(topicName, 0);
-        Set<TopicPartition> tps = Collections.singleton(tp1);
-
-        // Then the topic admin should return immediately
-        Admin mockAdmin = mock(Admin.class);
-        TopicAdmin admin = new TopicAdmin(null, mockAdmin);
-        Map<TopicPartition, Long> offsets = admin.endOffsets(Collections.emptySet());
-        assertTrue(offsets.isEmpty());
-    }
-
-    @Test
-    public void endOffsetsShouldReturnOffsetsForOnePartition() throws Exception {
-        String topicName = "myTopic";
-        long offset = 1L;
-        TopicPartition tp1 = new TopicPartition(topicName, 0);
-        Set<TopicPartition> tps = Collections.singleton(tp1);
-
-        // When the admin client lists offsets is called with one topic partition
-        Admin mockAdmin = mock(Admin.class);
-        ListOffsetsResult results = mock(ListOffsetsResult.class);
-        when(mockAdmin.listOffsets(anyMap())).thenReturn(results);
-        expectOffsets(results, tp1, offset);
-
-        // Then the topic admin should return offsets
-        TopicAdmin admin = new TopicAdmin(null, mockAdmin);
-        Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
-        assertEquals(1, offsets.size());
-        assertEquals(Long.valueOf(offset), offsets.get(tp1));
-    }
-
-    @Test
-    public void endOffsetsShouldReturnOffsetsForMultiplePartitions() throws Exception {
-        long offset1 = 1L;
-        long offset2 = 2L;
-        String topicName = "myTopic";
-        TopicPartition tp1 = new TopicPartition(topicName, 0);
-        TopicPartition tp2 = new TopicPartition(topicName, 1);
-        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
-
-        // When the admin client lists offsets is called with one topic partition
-        Admin mockAdmin = mock(Admin.class);
-        ListOffsetsResult results = mock(ListOffsetsResult.class);
-        when(mockAdmin.listOffsets(anyMap())).thenReturn(results);
-        expectOffsets(results, tp1, offset1);
-        expectOffsets(results, tp2, offset2);
-
-        // Then the topic admin should return offsets
-        TopicAdmin admin = new TopicAdmin(null, mockAdmin);
-        Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
-        assertEquals(2, offsets.size());
-        assertEquals(Long.valueOf(offset1), offsets.get(tp1));
-        assertEquals(Long.valueOf(offset2), offsets.get(tp2));
-    }
-
-    @Test
-    public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() throws Exception {
-        String topicName = "myTopic";
-        TopicPartition tp1 = new TopicPartition(topicName, 0);
-        Set<TopicPartition> tps = Collections.singleton(tp1);
-
-        // When the admin client lists offsets throws an exception
-        Admin mockAdmin = expectAdminListOffsetsFailure(new AuthorizationException("failed"));
-
-        // Then the topic admin should throw exception
-        TopicAdmin admin = new TopicAdmin(null, mockAdmin);
-        ConnectException e = assertThrows(ConnectException.class, () -> {
-            admin.endOffsets(tps);
-        });
-        assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
-    }
-
     private Cluster createCluster(int numNodes) {
         HashMap<Integer, Node> nodes = new HashMap<>();
         for (int i = 0; i < numNodes; ++i) {