You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/06/11 04:25:42 UTC

[kafka] branch 2.6 updated: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy (#8828)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 46840cb  KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy (#8828)
46840cb is described below

commit 46840cbfe51aab3f74449112b8ee6d89ca221bb2
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Wed Jun 10 22:39:52 2020 -0500

    KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy (#8828)
    
    This change adds a check to the KafkaConfigBackingStore, KafkaOffsetBackingStore, and KafkaStatusBackingStore to use the admin client to verify that the internal topics are compacted and do not use the `delete` cleanup policy.
    
    Connect already will create the internal topics with `cleanup.policy=compact` if the topics do not yet exist when the Connect workers are started; the new topics are created always as compacted, overwriting any user-specified `cleanup.policy`. However, if the topics already exist the worker did not previously verify the internal topics were compacted, such as when a user manually creates the internal topics before starting Connect or manually changes the topic settings after the fact.
    
    The current change helps guard against users running Connect with topics that have delete cleanup policy enabled, which will remove all connector configurations, source offsets, and connector & task statuses that are older than the retention time. This means that, for example, the configuration for a long-running connector could be deleted by the broker, and this will cause restart issues upon a subsequent rebalance or restarting of Connect worker(s).
    
    Connect behavior requires that its internal topics are compacted and not deleted after some retention time. Therefore, this additional check is simply enforcing the existing expectations, and therefore does not need a KIP.
    
    Author: Randall Hauch <rh...@gmail.com>
    Reviewer: Konstantine Karantasis <ko...@confluent.io>, Chris Egerton <ch...@confluent.io>
---
 .../kafka/clients/admin/MockAdminClient.java       |  45 +++++
 .../kafka/connect/runtime/AbstractHerder.java      |   6 +
 .../org/apache/kafka/connect/runtime/Connect.java  |   4 +
 .../org/apache/kafka/connect/runtime/Herder.java   |   2 +
 .../runtime/distributed/DistributedHerder.java     |   4 +
 .../runtime/standalone/StandaloneHerder.java       |   2 +
 .../connect/storage/KafkaConfigBackingStore.java   |  10 +-
 .../connect/storage/KafkaOffsetBackingStore.java   |  11 +-
 .../connect/storage/KafkaStatusBackingStore.java   |  10 +-
 .../org/apache/kafka/connect/util/TopicAdmin.java  | 164 +++++++++++++++
 .../integration/InternalTopicsIntegrationTest.java | 138 +++++++++++++
 .../apache/kafka/connect/util/TopicAdminTest.java  | 219 +++++++++++++++++++++
 .../util/clusters/EmbeddedConnectCluster.java      |  22 ++-
 .../util/clusters/EmbeddedKafkaCluster.java        |   4 +-
 .../kafka/connect/util/clusters/WorkerHandle.java  |   9 +
 15 files changed, 645 insertions(+), 5 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c084c99..2b86d4f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -366,6 +366,51 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
+    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
+        Map<ConfigResource, KafkaFuture<Config>> topicConfigs = new HashMap<>();
+
+        if (timeoutNextRequests > 0) {
+            for (ConfigResource requestedResource : resources) {
+                KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new TimeoutException());
+                topicConfigs.put(requestedResource, future);
+            }
+
+            --timeoutNextRequests;
+            return new DescribeConfigsResult(topicConfigs);
+        }
+
+        for (ConfigResource requestedResource : resources) {
+            if (requestedResource.type() != ConfigResource.Type.TOPIC) {
+                continue;
+            }
+            for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) {
+                String topicName = topicDescription.getKey();
+                if (topicName.equals(requestedResource.name()) && !topicDescription.getValue().markedForDeletion) {
+                    if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
+                        topicDescription.getValue().fetchesRemainingUntilVisible--;
+                    } else {
+                        TopicMetadata topicMetadata = topicDescription.getValue();
+                        KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+                        Collection<ConfigEntry> entries = new ArrayList<>();
+                        topicMetadata.configs.forEach((k, v) -> entries.add(new ConfigEntry(k, v)));
+                        future.complete(new Config(entries));
+                        topicConfigs.put(requestedResource, future);
+                        break;
+                    }
+                }
+            }
+            if (!topicConfigs.containsKey(requestedResource)) {
+                KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new UnknownTopicOrPartitionException("Resource " + requestedResource + " not found."));
+                topicConfigs.put(requestedResource, future);
+            }
+        }
+
+        return new DescribeConfigsResult(topicConfigs);
+    }
+
+    @Override
     synchronized public DeleteTopicsResult deleteTopics(Collection<String> topicsToDelete, DeleteTopicsOptions options) {
         Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>();
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 08163cc..5e3cb86 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -94,6 +94,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     protected final StatusBackingStore statusBackingStore;
     protected final ConfigBackingStore configBackingStore;
     private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
+    protected volatile boolean running = false;
 
     private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
 
@@ -132,6 +133,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     }
 
     @Override
+    public boolean isRunning() {
+        return running;
+    }
+
+    @Override
     public void onStartup(String connector) {
         statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RUNNING,
                 workerId, generation()));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index f08586f..80eef03 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -84,6 +84,10 @@ public class Connect {
         }
     }
 
+    public boolean isRunning() {
+        return herder.isRunning();
+    }
+
     // Visible for testing
     public URI restUrl() {
         return rest.serverUrl();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index cbd7f3f..c8b6b80 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -59,6 +59,8 @@ public interface Herder {
 
     void stop();
 
+    boolean isRunning();
+
     /**
      * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered
      * from the current configuration. However, note
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 7762f9f..c7051db 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -288,6 +288,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             startServices();
 
             log.info("Herder started");
+            running = true;
 
             while (!stopping.get()) {
                 tick();
@@ -300,6 +301,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         } catch (Throwable t) {
             log.error("Uncaught exception in herder work thread, exiting: ", t);
             Exit.exit(1);
+        } finally {
+            running = false;
         }
     }
 
@@ -638,6 +641,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
 
         log.info("Herder stopped");
+        running = false;
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index ade3f9e..a2b082e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -92,6 +92,7 @@ public class StandaloneHerder extends AbstractHerder {
     public synchronized void start() {
         log.info("Herder starting");
         startServices();
+        running = true;
         log.info("Herder started");
     }
 
@@ -114,6 +115,7 @@ public class StandaloneHerder extends AbstractHerder {
             worker.stopConnector(connName);
         }
         stopServices();
+        running = false;
         log.info("Herder stopped");
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 9512e03..29299a8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -492,7 +493,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
             public void run() {
                 log.debug("Creating admin client to manage Connect internal config topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                    admin.createTopics(topicDescription);
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = admin.createTopics(topicDescription);
+                    if (!newTopics.contains(topic)) {
+                        // It already existed, so check that the topic cleanup policy is compact only and not delete
+                        log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                        admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                                DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
+                    }
                 }
             }
         };
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 1d26d65..8408f99 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
@@ -40,6 +41,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -106,7 +108,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
             public void run() {
                 log.debug("Creating admin client to manage Connect internal offset topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                    admin.createTopics(topicDescription);
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = admin.createTopics(topicDescription);
+                    if (!newTopics.contains(topic)) {
+                        // It already existed, so check that the topic cleanup policy is compact only and not delete
+                        log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                        admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                                DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
+                    }
                 }
             }
         };
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 0b0f4a5..5d6057d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -197,7 +198,14 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
             public void run() {
                 log.debug("Creating admin client to manage Connect internal status topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                    admin.createTopics(topicDescription);
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = admin.createTopics(topicDescription);
+                    if (!newTopics.contains(topic)) {
+                        // It already existed, so check that the topic cleanup policy is compact only and not delete
+                        log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+                        admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+                                DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses");
+                    }
                 }
             }
         };
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 4809b49..615e7a3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -18,11 +18,16 @@ package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.CreateTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeConfigsOptions;
 import org.apache.kafka.clients.admin.DescribeTopicsOptions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
@@ -39,13 +44,16 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 /**
  * Utility to simplify creating and managing topics via the {@link Admin}.
@@ -375,6 +383,162 @@ public class TopicAdmin implements AutoCloseable {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies.isEmpty()) {
+            log.info("Unable to use admin client to verify the cleanup policy of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for Connect "
+                      + "internal topics does not have the required permission to "
+                      + "describe topic configurations.", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        Set<String> expectedPolicies = Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        if (!cleanupPolicies.equals(expectedPolicies)) {
+            String expectedPolicyStr = String.join(",", expectedPolicies);
+            String cleanupPolicyStr = String.join(",", cleanupPolicies);
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+                    + "to have '%s=%s' to guarantee consistency and durability of "
+                    + "%s, but found the topic currently has '%s=%s'. Continuing would likely "
+                    + "result in eventually losing %s and problems restarting this Connect "
+                    + "cluster in the future. Change the '%s' property in the "
+                    + "Connect worker configurations to use a topic with '%s=%s'.",
+                    topic, workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr,
+                    topicPurpose, TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicyStr, topicPurpose,
+                    workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr);
+            throw new ConfigException(msg);
+        }
+        return true;
+    }
+
+    /**
+     * Get the cleanup policy for a topic.
+     *
+     * @param topic the name of the topic
+     * @return the set of cleanup policies set for the topic; may be empty if the topic does not
+     *         exist or the topic's cleanup policy could not be retrieved
+     */
+    public Set<String> topicCleanupPolicy(String topic) {
+        Config topicConfig = describeTopicConfig(topic);
+        if (topicConfig == null) {
+            // The topic must not exist
+            log.debug("Unable to find topic '{}' when getting cleanup policy", topic);
+            return Collections.emptySet();
+        }
+        ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
+        if (entry != null && entry.value() != null) {
+            String policyStr = entry.value();
+            log.debug("Found cleanup.policy={} for topic '{}'", policyStr, topic);
+            return Arrays.stream(policyStr.split(","))
+                         .map(String::trim)
+                         .filter(s -> !s.isEmpty())
+                         .map(String::toLowerCase)
+                         .collect(Collectors.toSet());
+        }
+        // This is unexpected, as the topic config should include the cleanup.policy even if
+        // the topic settings don't override the broker's log.cleanup.policy. But just to be safe.
+        log.debug("Found no cleanup.policy for topic '{}'", topic);
+        return Collections.emptySet();
+    }
+
+    /**
+     * Attempt to fetch the topic configuration for the given topic.
+     * Apache Kafka added support for describing topic configurations in 0.11.0.0, so this method
+     * works as expected with that and later versions. With brokers older than 0.11.0.0, this method
+     * is unable get the topic configurations and always returns a null value.
+     *
+     * <p>If the topic does not exist, a null value is returned.
+     *
+     * @param topic the name of the topic for which the topic configuration should be obtained
+     * @return the topic configuration if the topic exists, or null if the topic did not exist
+     * @throws RetriableException if a retriable error occurs, the operation takes too long, or the
+     *         thread is interrupted while attempting to perform this operation
+     * @throws ConnectException if a non retriable error occurs
+     */
+    public Config describeTopicConfig(String topic) {
+        return describeTopicConfigs(topic).get(topic);
+    }
+
+    /**
+     * Attempt to fetch the topic configurations for the given topics.
+     * Apache Kafka added support for describing topic configurations in 0.11.0.0, so this method
+     * works as expected with that and later versions. With brokers older than 0.11.0.0, this method
+     * is unable get the topic configurations and always returns an empty set.
+     *
+     * <p>An entry with a null Config is placed into the resulting map for any topic that does
+     * not exist on the brokers.
+     *
+     * @param topicNames the topics to obtain configurations
+     * @return the map of topic configurations for each existing topic, or an empty map if none
+     * of the topics exist
+     * @throws RetriableException if a retriable error occurs, the operation takes too long, or the
+     *         thread is interrupted while attempting to perform this operation
+     * @throws ConnectException if a non retriable error occurs
+     */
+    public Map<String, Config> describeTopicConfigs(String... topicNames) {
+        if (topicNames == null) {
+            return Collections.emptyMap();
+        }
+        Collection<String> topics = Arrays.stream(topicNames)
+                                          .filter(Objects::nonNull)
+                                          .map(String::trim)
+                                          .filter(s -> !s.isEmpty())
+                                          .collect(Collectors.toList());
+        if (topics.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        String bootstrapServers = bootstrapServers();
+        String topicNameList = topics.stream().collect(Collectors.joining(", "));
+        Collection<ConfigResource> resources = topics.stream()
+                                                     .map(t -> new ConfigResource(ConfigResource.Type.TOPIC, t))
+                                                     .collect(Collectors.toList());
+
+        Map<ConfigResource, KafkaFuture<Config>> newResults = admin.describeConfigs(resources, new DescribeConfigsOptions()).values();
+
+        // Iterate over each future so that we can handle individual failures like when some topics don't exist
+        Map<String, Config> result = new HashMap<>();
+        newResults.forEach((resource, configs) -> {
+            String topic = resource.name();
+            try {
+                result.put(topic, configs.get());
+            } catch (ExecutionException e) {
+                Throwable cause = e.getCause();
+                if (cause instanceof UnknownTopicOrPartitionException) {
+                    log.debug("Topic '{}' does not exist on the brokers at {}", topic, bootstrapServers);
+                    result.put(topic, null);
+                } else if (cause instanceof ClusterAuthorizationException || cause instanceof TopicAuthorizationException) {
+                    log.debug("Not authorized to describe topic config for topic '{}' on brokers at {}", topic, bootstrapServers);
+                } else if (cause instanceof UnsupportedVersionException) {
+                    log.debug("API to describe topic config for topic '{}' is unsupported on brokers at {}", topic, bootstrapServers);
+                } else if (cause instanceof TimeoutException) {
+                    String msg = String.format("Timed out while waiting to describe topic config for topic '%s' on brokers at %s",
+                            topic, bootstrapServers);
+                    throw new RetriableException(msg, e);
+                } else {
+                    String msg = String.format("Error while attempting to describe topic config for topic '%s' on brokers at %s",
+                            topic, bootstrapServers);
+                    throw new ConnectException(msg, e);
+                }
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                String msg = String.format("Interrupted while attempting to describe topic configs '%s'", topicNameList);
+                throw new RetriableException(msg, e);
+            }
+        });
+        return result;
+    }
+
     @Override
     public void close() {
         admin.close();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
index 191de84..d73d1c4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.kafka.connect.integration;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.WorkerHandle;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
@@ -30,6 +33,8 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.Assert.assertFalse;
+
 /**
  * Integration test for the creation of internal topics.
  */
@@ -141,6 +146,139 @@ public class InternalTopicsIntegrationTest {
         connect.assertions().assertTopicsDoNotExist(configTopic(), offsetTopic());
     }
 
+    @Test
+    public void testFailToStartWhenInternalTopicsAreNotCompacted() throws InterruptedException {
+        // Change the broker default cleanup policy to something Connect doesn't like
+        brokerProps.put("log." + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
+        // Start out using the improperly configured topics
+        workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "bad-config");
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "bad-offset");
+        workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "bad-status");
+        workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, "1");
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, "1");
+        workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, "1");
+        int numWorkers = 0;
+        int numBrokers = 1;
+        connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1")
+                                                      .workerProps(workerProps)
+                                                      .numWorkers(numWorkers)
+                                                      .numBrokers(numBrokers)
+                                                      .brokerProps(brokerProps)
+                                                      .build();
+
+        // Start the brokers but not Connect
+        log.info("Starting {} Kafka brokers, but no Connect workers yet", numBrokers);
+        connect.start();
+        connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time.");
+        log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers);
+
+        // Create the good topics
+        connect.kafka().createTopic("good-config", 1, 1, compactCleanupPolicy());
+        connect.kafka().createTopic("good-offset", 1, 1, compactCleanupPolicy());
+        connect.kafka().createTopic("good-status", 1, 1, compactCleanupPolicy());
+
+        // Create the poorly-configured topics
+        connect.kafka().createTopic("bad-config", 1, 1, deleteCleanupPolicy());
+        connect.kafka().createTopic("bad-offset", 1, 1, compactAndDeleteCleanupPolicy());
+        connect.kafka().createTopic("bad-status", 1, 1, noTopicSettings());
+
+        // Check the topics
+        log.info("Verifying the internal topics for Connect were manually created");
+        connect.assertions().assertTopicsExist("good-config", "good-offset", "good-status", "bad-config", "bad-offset", "bad-status");
+
+        // Try to start one worker, with three bad topics
+        WorkerHandle worker = connect.addWorker(); // should have failed to start before returning
+        assertFalse(worker.isRunning());
+        assertFalse(connect.allWorkersRunning());
+        assertFalse(connect.anyWorkersRunning());
+        connect.removeWorker(worker);
+
+        // We rely upon the fact that we can change the worker properties before the workers are started
+        workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "good-config");
+
+        // Try to start one worker, with two bad topics remaining
+        worker = connect.addWorker(); // should have failed to start before returning
+        assertFalse(worker.isRunning());
+        assertFalse(connect.allWorkersRunning());
+        assertFalse(connect.anyWorkersRunning());
+        connect.removeWorker(worker);
+
+        // We rely upon the fact that we can change the worker properties before the workers are started
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "good-offset");
+
+        // Try to start one worker, with one bad topic remaining
+        worker = connect.addWorker(); // should have failed to start before returning
+        assertFalse(worker.isRunning());
+        assertFalse(connect.allWorkersRunning());
+        assertFalse(connect.anyWorkersRunning());
+        connect.removeWorker(worker);
+        // We rely upon the fact that we can change the worker properties before the workers are started
+        workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "good-status");
+
+        // Try to start one worker, now using all good internal topics
+        connect.addWorker();
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Worker did not start in time.");
+    }
+
+    @Test
+    public void testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy() throws InterruptedException {
+        // Change the broker default cleanup policy to compact, which is good for Connect
+        brokerProps.put("log." + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
+        // Start out using the properly configured topics
+        workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "offset-topic");
+        workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
+        workerProps.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, "1");
+        workerProps.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, "1");
+        workerProps.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, "1");
+        int numWorkers = 0;
+        int numBrokers = 1;
+        connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1")
+                                                      .workerProps(workerProps)
+                                                      .numWorkers(numWorkers)
+                                                      .numBrokers(numBrokers)
+                                                      .brokerProps(brokerProps)
+                                                      .build();
+
+        // Start the brokers but not Connect
+        log.info("Starting {} Kafka brokers, but no Connect workers yet", numBrokers);
+        connect.start();
+        connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Broker did not start in time.");
+        log.info("Completed startup of {} Kafka broker. Expected Connect worker to fail", numBrokers);
+
+        // Create the valid internal topics w/o topic settings, so these will use the broker's
+        // broker's log.cleanup.policy=compact (set above)
+        connect.kafka().createTopic("config-topic", 1, 1, noTopicSettings());
+        connect.kafka().createTopic("offset-topic", 1, 1, noTopicSettings());
+        connect.kafka().createTopic("status-topic", 1, 1, noTopicSettings());
+
+        // Check the topics
+        log.info("Verifying the internal topics for Connect were manually created");
+        connect.assertions().assertTopicsExist("config-topic", "offset-topic", "status-topic");
+
+        // Try to start one worker using valid internal topics
+        connect.addWorker();
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Worker did not start in time.");
+    }
+
+    protected Map<String, String> compactCleanupPolicy() {
+        return Collections.singletonMap(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
+    }
+
+    protected Map<String, String> deleteCleanupPolicy() {
+        return Collections.singletonMap(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
+    }
+
+    protected Map<String, String> noTopicSettings() {
+        return Collections.emptyMap();
+    }
+
+    protected Map<String, String> compactAndDeleteCleanupPolicy() {
+        Map<String, String> config = new HashMap<>();
+        config.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE + "," + TopicConfig.CLEANUP_POLICY_COMPACT);
+        return config;
+    }
+
     protected void assertInternalTopicSettings() throws InterruptedException {
         DistributedConfig config = new DistributedConfig(workerProps);
         connect.assertions().assertTopicSettings(
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index caaa639..b655664 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
+import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
@@ -27,6 +28,9 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -36,11 +40,14 @@ import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -50,6 +57,8 @@ import java.util.concurrent.ExecutionException;
 import static org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -261,6 +270,187 @@ public class TopicAdminTest {
         }
     }
 
+    @Test
+    public void describeTopicConfigShouldReturnEmptyMapWhenNoTopicsAreSpecified() {
+        final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().prepareResponse(describeConfigsResponseWithUnsupportedVersion(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<String, Config> results = admin.describeTopicConfigs();
+            assertTrue(results.isEmpty());
+        }
+    }
+
+    @Test
+    public void describeTopicConfigShouldReturnEmptyMapWhenUnsupportedVersionFailure() {
+        final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().prepareResponse(describeConfigsResponseWithUnsupportedVersion(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<String, Config> results = admin.describeTopicConfigs(newTopic.name());
+            assertTrue(results.isEmpty());
+        }
+    }
+
+    @Test
+    public void describeTopicConfigShouldReturnEmptyMapWhenClusterAuthorizationFailure() {
+        final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().prepareResponse(describeConfigsResponseWithClusterAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<String, Config> results = admin.describeTopicConfigs(newTopic.name());
+            assertTrue(results.isEmpty());
+        }
+    }
+
+    @Test
+    public void describeTopicConfigShouldReturnEmptyMapWhenTopicAuthorizationFailure() {
+        final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().prepareResponse(describeConfigsResponseWithTopicAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<String, Config> results = admin.describeTopicConfigs(newTopic.name());
+            assertTrue(results.isEmpty());
+        }
+    }
+
+    @Test
+    public void describeTopicConfigShouldReturnMapWithNullValueWhenTopicDoesNotExist() {
+        NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (TopicAdmin admin = new TopicAdmin(null, new MockAdminClient(cluster.nodes(), cluster.nodeById(0)))) {
+            Map<String, Config> results = admin.describeTopicConfigs(newTopic.name());
+            assertFalse(results.isEmpty());
+            assertEquals(1, results.size());
+            assertNull(results.get("myTopic"));
+        }
+    }
+
+    @Test
+    public void describeTopicConfigShouldReturnTopicConfigWhenTopicExists() {
+        String topicName = "myTopic";
+        NewTopic newTopic = TopicAdmin.defineTopic(topicName)
+                                      .config(Collections.singletonMap("foo", "bar"))
+                                      .partitions(1)
+                                      .compacted()
+                                      .build();
+        Cluster cluster = createCluster(1);
+        try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, topicName, Collections.singletonList(topicPartitionInfo), null);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            Map<String, Config> result = admin.describeTopicConfigs(newTopic.name());
+            assertFalse(result.isEmpty());
+            assertEquals(1, result.size());
+            Config config = result.get("myTopic");
+            assertNotNull(config);
+            config.entries().forEach(entry -> {
+                assertEquals(newTopic.configs().get(entry.name()), entry.value());
+            });
+        }
+    }
+
+    @Test
+    public void verifyingTopicCleanupPolicyShouldReturnFalseWhenBrokerVersionIsUnsupported() {
+        final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().prepareResponse(describeConfigsResponseWithUnsupportedVersion(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean result = admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", "worker.topic", "purpose");
+            assertFalse(result);
+        }
+    }
+
+    @Test
+    public void verifyingTopicCleanupPolicyShouldReturnFalseWhenClusterAuthorizationError() {
+        final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().prepareResponse(describeConfigsResponseWithClusterAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean result = admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", "worker.topic", "purpose");
+            assertFalse(result);
+        }
+    }
+
+    @Test
+    public void verifyingTopicCleanupPolicyShouldReturnFalseWhenTopicAuthorizationError() {
+        final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
+        Cluster cluster = createCluster(1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().prepareResponse(describeConfigsResponseWithTopicAuthorizationException(newTopic));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            boolean result = admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", "worker.topic", "purpose");
+            assertFalse(result);
+        }
+    }
+
+    @Test
+    public void verifyingTopicCleanupPolicyShouldReturnTrueWhenTopicHasCorrectPolicy() {
+        String topicName = "myTopic";
+        Map<String, String> topicConfigs = Collections.singletonMap("cleanup.policy", "compact");
+        Cluster cluster = createCluster(1);
+        try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, topicName, Collections.singletonList(topicPartitionInfo), topicConfigs);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            boolean result = admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", "worker.topic", "purpose");
+            assertTrue(result);
+        }
+    }
+
+    @Test
+    public void verifyingTopicCleanupPolicyShouldFailWhenTopicHasDeletePolicy() {
+        String topicName = "myTopic";
+        Map<String, String> topicConfigs = Collections.singletonMap("cleanup.policy", "delete");
+        Cluster cluster = createCluster(1);
+        try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, topicName, Collections.singletonList(topicPartitionInfo), topicConfigs);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            ConfigException e = assertThrows(ConfigException.class, () -> {
+                admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", "worker.topic", "purpose");
+            });
+            assertTrue(e.getMessage().contains("to guarantee consistency and durability"));
+        }
+    }
+
+    @Test
+    public void verifyingTopicCleanupPolicyShouldFailWhenTopicHasDeleteAndCompactPolicy() {
+        String topicName = "myTopic";
+        Map<String, String> topicConfigs = Collections.singletonMap("cleanup.policy", "delete,compact");
+        Cluster cluster = createCluster(1);
+        try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, topicName, Collections.singletonList(topicPartitionInfo), topicConfigs);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            ConfigException e = assertThrows(ConfigException.class, () -> {
+                admin.verifyTopicCleanupPolicyOnlyCompact("myTopic", "worker.topic", "purpose");
+            });
+            assertTrue(e.getMessage().contains("to guarantee consistency and durability"));
+        }
+    }
+
+    @Test
+    public void verifyingGettingTopicCleanupPolicies() {
+        String topicName = "myTopic";
+        Map<String, String> topicConfigs = Collections.singletonMap("cleanup.policy", "compact");
+        Cluster cluster = createCluster(1);
+        try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, topicName, Collections.singletonList(topicPartitionInfo), topicConfigs);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            Set<String> policies = admin.topicCleanupPolicy("myTopic");
+            assertEquals(1, policies.size());
+            assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, policies.iterator().next());
+        }
+    }
+
     private Cluster createCluster(int numNodes) {
         HashMap<Integer, Node> nodes = new HashMap<>();
         for (int i = 0; i < numNodes; ++i) {
@@ -363,4 +553,33 @@ public class TopicAdminTest {
         }
         return new MetadataResponse(response);
     }
+
+    private DescribeConfigsResponse describeConfigsResponseWithUnsupportedVersion(NewTopic... topics) {
+        return describeConfigsResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics);
+    }
+
+    private DescribeConfigsResponse describeConfigsResponseWithClusterAuthorizationException(NewTopic... topics) {
+        return describeConfigsResponse(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
+    }
+
+    private DescribeConfigsResponse describeConfigsResponseWithTopicAuthorizationException(NewTopic... topics) {
+        return describeConfigsResponse(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
+    }
+
+    private DescribeConfigsResponse describeConfigsResponse(ApiError error, NewTopic... topics) {
+        if (error == null) error = new ApiError(Errors.NONE, "");
+        Map<ConfigResource, DescribeConfigsResponse.Config> configs = new HashMap<>();
+        for (NewTopic topic : topics) {
+            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic.name());
+            DescribeConfigsResponse.ConfigSource source = DescribeConfigsResponse.ConfigSource.TOPIC_CONFIG;
+            Collection<DescribeConfigsResponse.ConfigEntry> entries = new ArrayList<>();
+            topic.configs().forEach((k, v) -> new DescribeConfigsResponse.ConfigEntry(
+                    k, v, source, false, false, Collections.emptySet()
+            ));
+            DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error, entries);
+            configs.put(resource, config);
+        }
+        return new DescribeConfigsResponse(1000, configs);
+    }
+
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 2e40769..5f6a729 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -183,7 +183,9 @@ public class EmbeddedConnectCluster {
         WorkerHandle toRemove = null;
         for (Iterator<WorkerHandle> it = connectCluster.iterator(); it.hasNext(); toRemove = it.next()) {
         }
-        removeWorker(toRemove);
+        if (toRemove != null) {
+            removeWorker(toRemove);
+        }
     }
 
     /**
@@ -212,6 +214,24 @@ public class EmbeddedConnectCluster {
         }
     }
 
+    /**
+     * Determine whether the Connect cluster has any workers running.
+     *
+     * @return true if any worker is running, or false otherwise
+     */
+    public boolean anyWorkersRunning() {
+        return workers().stream().anyMatch(WorkerHandle::isRunning);
+    }
+
+    /**
+     * Determine whether the Connect cluster has all workers running.
+     *
+     * @return true if all workers are running, or false otherwise
+     */
+    public boolean allWorkersRunning() {
+        return workers().stream().allMatch(WorkerHandle::isRunning);
+    }
+
     @SuppressWarnings("deprecation")
     public void startConnect() {
         log.info("Starting Connect cluster '{}' with {} workers", connectClusterName, numInitialWorkers);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index b6c96f1..9af828e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -174,7 +174,9 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
     private void stop(boolean deleteLogDirs, boolean stopZK) {
         try {
-            producer.close();
+            if (producer != null) {
+                producer.close();
+            }
         } catch (Exception e) {
             log.error("Could not shutdown producer ", e);
             throw new RuntimeException("Could not shutdown producer", e);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
index 64b2b12..4d94794 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/WorkerHandle.java
@@ -58,6 +58,15 @@ public class WorkerHandle {
     }
 
     /**
+     * Determine if this worker is running.
+     *
+     * @return true if the worker is running, or false otherwise
+     */
+    public boolean isRunning() {
+        return worker.isRunning();
+    }
+
+    /**
      * Get the workers's name corresponding to this handle.
      *
      * @return the worker's name