You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/03/04 19:29:16 UTC

[kafka] branch trunk updated: MINOR: Enable topic deletion in the KIP-500 controller (#10184)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eebc6f2  MINOR: Enable topic deletion in the KIP-500 controller (#10184)
eebc6f2 is described below

commit eebc6f279e61bcd9a331a3a0b305b57ffdc1ab66
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Thu Mar 4 11:28:20 2021 -0800

    MINOR: Enable topic deletion in the KIP-500 controller (#10184)
    
    This patch enables delete topic support for the new KIP-500 controller. Also fixes the following:
    - Fix a bug where feature level records were not correctly replayed.
    - Fix a bug in TimelineHashMap#remove where the wrong type was being returned.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Justine Olshan <jo...@confluent.io>, Ron Dagostino <rd...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>, Jun Rao <ju...@gmail.com>
    
    Co-authored-by: Jason Gustafson <ja...@confluent.io>
---
 checkstyle/import-control-core.xml                 |   2 +
 checkstyle/suppressions.xml                        |   2 +-
 .../main/scala/kafka/server/ControllerApis.scala   | 203 ++++++++++++++++---
 core/src/test/java/kafka/test/MockController.java  | 222 +++++++++++++++++++++
 .../unit/kafka/server/ControllerApisTest.scala     | 217 +++++++++++++++++++-
 .../org/apache/kafka/controller/BrokersToIsrs.java |   7 +
 .../controller/ConfigurationControlManager.java    |   6 +-
 .../org/apache/kafka/controller/Controller.java    |  26 +++
 .../apache/kafka/controller/QuorumController.java  |  68 +++++--
 .../controller/ReplicationControlManager.java      | 131 ++++++++++--
 .../org/apache/kafka/controller/ResultOrError.java |   2 +-
 .../kafka/timeline/SnapshottableHashTable.java     |   2 +-
 .../org/apache/kafka/timeline/TimelineHashMap.java |   7 +-
 .../controller/ReplicationControlManagerTest.java  | 195 ++++++++++++++----
 .../apache/kafka/timeline/TimelineHashMapTest.java |   2 +
 15 files changed, 980 insertions(+), 112 deletions(-)

diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index e9653ba..e5abcca 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -59,6 +59,8 @@
   </subpackage>
 
   <subpackage name="test">
+    <allow pkg="org.apache.kafka.controller"/>
+    <allow pkg="org.apache.kafka.metadata"/>
     <allow pkg="kafka.test.annotation"/>
     <allow pkg="kafka.test.junit"/>
     <allow pkg="kafka.network"/>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 33273fe..242fef6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -265,7 +265,7 @@
 
     <!-- metadata -->
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(ReplicationControlManager).java"/>
+              files="(ReplicationControlManager|ReplicationControlManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(QuorumController|ReplicationControlManager).java"/>
     <suppress checks="CyclomaticComplexity"
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 5670448..1d9fd44 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -18,21 +18,26 @@
 package kafka.server
 
 import java.util
+import java.util.Collections
+import java.util.concurrent.ExecutionException
 
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.Logging
 import org.apache.kafka.clients.admin.AlterConfigOp
-import org.apache.kafka.common.Node
-import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE}
+import org.apache.kafka.common.Uuid.ZERO_UUID
+import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
 import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException}
+import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException}
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
+import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
-import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData}
+import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DeleteTopicsRequestData, DeleteTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData}
+import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, TOPIC_AUTHORIZATION_FAILED}
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.Resource
@@ -46,6 +51,7 @@ import org.apache.kafka.server.authorizer.Authorizer
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
+
 /**
  * Request handler for Controller APIs
  */
@@ -70,6 +76,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.FETCH => handleFetch(request)
         case ApiKeys.METADATA => handleMetadataRequest(request)
         case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
+        case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
         case ApiKeys.VOTE => handleVote(request)
         case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
@@ -84,10 +91,11 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.ENVELOPE => handleEnvelopeRequest(request)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
-        case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey()}")
+        case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
       }
     } catch {
       case e: FatalExitError => throw e
+      case e: ExecutionException => requestHelper.handleError(request, e.getCause)
       case e: Throwable => requestHelper.handleError(request, e)
     }
   }
@@ -124,14 +132,14 @@ class ControllerApis(val requestChannel: RequestChannel,
       val metadataResponseData = new MetadataResponseData()
       metadataResponseData.setThrottleTimeMs(requestThrottleMs)
       controllerNodes.foreach { node =>
-        metadataResponseData.brokers().add(new MetadataResponseBroker()
+        metadataResponseData.brokers.add(new MetadataResponseBroker()
           .setHost(node.host)
           .setNodeId(node.id)
           .setPort(node.port)
           .setRack(node.rack))
       }
       metadataResponseData.setClusterId(metaProperties.clusterId.toString)
-      if (controller.isActive()) {
+      if (controller.isActive) {
         metadataResponseData.setControllerId(config.nodeId)
       } else {
         metadataResponseData.setControllerId(MetadataResponse.NO_CONTROLLER_ID)
@@ -153,17 +161,164 @@ class ControllerApis(val requestChannel: RequestChannel,
       requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+    val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+      request.context.apiVersion,
+      authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
+      names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new DeleteTopicsResponseData().
+        setResponses(new DeletableTopicResultCollection(responses.iterator)).
+        setThrottleTimeMs(throttleTimeMs)
+      new DeleteTopicsResponse(responseData)
+    })
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+                   apiVersion: Int,
+                   hasClusterAuth: Boolean,
+                   getDescribableTopics: Iterable[String] => Set[String],
+                   getDeletableTopics: Iterable[String] => Set[String]): util.List[DeletableTopicResult] = {
+    // Check if topic deletion is enabled at all.
+    if (!config.deleteTopicEnable) {
+      if (apiVersion < 3) {
+        throw new InvalidRequestException("Topic deletion is disabled.")
+      } else {
+        throw new TopicDeletionDisabledException()
+      }
+    }
+    // The first step is to load up the names and IDs that have been provided by the
+    // request.  This is a bit messy because we support multiple ways of referring to
+    // topics (both by name and by id) and because we need to check for duplicates or
+    // other invalid inputs.
+    val responses = new util.ArrayList[DeletableTopicResult]
+    def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+      responses.add(new DeletableTopicResult().
+        setName(name).
+        setTopicId(id).
+        setErrorCode(error.error.code).
+        setErrorMessage(error.message))
+    }
+    val providedNames = new util.HashSet[String]
+    val duplicateProvidedNames = new util.HashSet[String]
+    val providedIds = new util.HashSet[Uuid]
+    val duplicateProvidedIds = new util.HashSet[Uuid]
+    def addProvidedName(name: String): Unit = {
+      if (duplicateProvidedNames.contains(name) || !providedNames.add(name)) {
+        duplicateProvidedNames.add(name)
+        providedNames.remove(name)
+      }
+    }
+    request.topicNames.forEach(addProvidedName)
+    request.topics.forEach {
+      topic => if (topic.name == null) {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+            "Neither topic name nor id were specified."))
+        } else if (duplicateProvidedIds.contains(topic.topicId) || !providedIds.add(topic.topicId)) {
+          duplicateProvidedIds.add(topic.topicId)
+          providedIds.remove(topic.topicId)
+        }
+      } else {
+        if (topic.topicId.equals(ZERO_UUID)) {
+          addProvidedName(topic.name)
+        } else {
+          appendResponse(topic.name, topic.topicId, new ApiError(INVALID_REQUEST,
+            "You may not specify both topic name and topic id."))
+        }
+      }
+    }
+    // Create error responses for duplicates.
+    duplicateProvidedNames.forEach(name => appendResponse(name, ZERO_UUID,
+      new ApiError(INVALID_REQUEST, "Duplicate topic name.")))
+    duplicateProvidedIds.forEach(id => appendResponse(null, id,
+      new ApiError(INVALID_REQUEST, "Duplicate topic id.")))
+    // At this point we have all the valid names and IDs that have been provided.
+    // However, the Authorizer needs topic names as inputs, not topic IDs.  So
+    // we need to resolve all IDs to names.
+    val toAuthenticate = new util.HashSet[String]
+    toAuthenticate.addAll(providedNames)
+    val idToName = new util.HashMap[Uuid, String]
+    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
+      if (nameOrError.isError) {
+        appendResponse(null, id, nameOrError.error())
+      } else {
+        toAuthenticate.add(nameOrError.result())
+        idToName.put(id, nameOrError.result())
+      }
+    }
+    // Get the list of deletable topics (those we can delete) and the list of describeable
+    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
+    // exist, even when it does.
+    val topicsToAuthenticate = toAuthenticate.asScala
+    val (describeable, deletable) = if (hasClusterAuth) {
+      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
+    } else {
+      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
+    }
+    // For each topic that was provided by ID, check if authentication failed.
+    // If so, remove it from the idToName map and create an error response for it.
+    val iterator = idToName.entrySet().iterator()
+    while (iterator.hasNext) {
+      val entry = iterator.next()
+      val id = entry.getKey
+      val name = entry.getValue
+      if (!deletable.contains(name)) {
+        if (describeable.contains(name)) {
+          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+        } else {
+          appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+        }
+        iterator.remove()
+      }
+    }
+    // For each topic that was provided by name, check if authentication failed.
+    // If so, create an error response for it.  Otherwise, add it to the idToName map.
+    controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
+      if (!describeable.contains(name)) {
+        appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+      } else if (idOrError.isError) {
+        appendResponse(name, ZERO_UUID, idOrError.error)
+      } else if (deletable.contains(name)) {
+        val id = idOrError.result()
+        if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) {
+          // This is kind of a weird case: what if we supply topic ID X and also a name
+          // that maps to ID X?  In that case, _if authorization succeeds_, we end up
+          // here.  If authorization doesn't succeed, we refrain from commenting on the
+          // situation since it would reveal topic ID mappings.
+          duplicateProvidedIds.add(id)
+          idToName.remove(id)
+          appendResponse(name, id, new ApiError(INVALID_REQUEST,
+            "The provided topic name maps to an ID that was already supplied."))
+        }
+      } else {
+        appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+      }
+    }
+    // Finally, the idToName map contains all the topics that we are authorized to delete.
+    // Perform the deletion and create responses for each one.
+    val idToError = controller.deleteTopics(idToName.keySet).get()
+    idToError.forEach { (id, error) =>
+        appendResponse(idToName.get(id), id, error)
+    }
+    // Shuffle the responses so that users can not use patterns in their positions to
+    // distinguish between absent topics and topics we are not permitted to see.
+    Collections.shuffle(responses)
+    responses
+  }
+
   def handleCreateTopics(request: RequestChannel.Request): Unit = {
     val createTopicRequest = request.body[CreateTopicsRequest]
     val (authorizedCreateRequest, unauthorizedTopics) =
       if (authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME)) {
         (createTopicRequest.data, Seq.empty)
       } else {
-        val duplicate = createTopicRequest.data.duplicate()
+        val duplicate = createTopicRequest.data.duplicate
         val authorizedTopics = new CreatableTopicCollection()
         val unauthorizedTopics = mutable.Buffer.empty[String]
 
-        createTopicRequest.data.topics.asScala.foreach { topicData =>
+        createTopicRequest.data.topics.forEach { topicData =>
           if (authHelper.authorize(request.context, CREATE, TOPIC, topicData.name)) {
             authorizedTopics.add(topicData)
           } else {
@@ -177,7 +332,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       unauthorizedTopics.foreach { topic =>
         val result = new CreatableTopicResult()
           .setName(topic)
-          .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+          .setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)
         response.topics.add(result)
       }
 
@@ -214,7 +369,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       if (apiVersionRequest.hasUnsupportedRequestVersion) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
       } else if (!apiVersionRequest.isValid) {
-        apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
+        apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception)
       } else {
         apiVersionManager.apiVersionResponse(requestThrottleMs)
       }
@@ -245,7 +400,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
     val alterIsrRequest = request.body[AlterIsrRequest]
-    val future = controller.alterIsr(alterIsrRequest.data())
+    val future = controller.alterIsr(alterIsrRequest.data)
     future.whenComplete((result, exception) => {
       val response = if (exception != null) {
         alterIsrRequest.getErrorResponse(exception)
@@ -267,14 +422,14 @@ class ControllerApis(val requestChannel: RequestChannel,
         if (e != null) {
           new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
             setThrottleTimeMs(requestThrottleMs).
-            setErrorCode(Errors.forException(e).code()))
+            setErrorCode(Errors.forException(e).code))
         } else {
           new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
             setThrottleTimeMs(requestThrottleMs).
-            setErrorCode(Errors.NONE.code()).
-            setIsCaughtUp(reply.isCaughtUp()).
-            setIsFenced(reply.isFenced()).
-            setShouldShutDown(reply.shouldShutDown()))
+            setErrorCode(Errors.NONE.code).
+            setIsCaughtUp(reply.isCaughtUp).
+            setIsFenced(reply.isFenced).
+            setShouldShutDown(reply.shouldShutDown))
         }
       }
       requestHelper.sendResponseMaybeThrottle(request,
@@ -292,7 +447,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         if (e != null) {
           new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
             setThrottleTimeMs(requestThrottleMs).
-            setErrorCode(Errors.forException(e).code()))
+            setErrorCode(Errors.forException(e).code))
         } else {
           new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
             setThrottleTimeMs(requestThrottleMs))
@@ -318,7 +473,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         } else {
           new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
             setThrottleTimeMs(requestThrottleMs).
-            setErrorCode(Errors.NONE.code()).
+            setErrorCode(Errors.NONE.code).
             setBrokerEpoch(reply.epoch))
         }
       }
@@ -346,7 +501,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     val quotaRequest = request.body[AlterClientQuotasRequest]
     authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
 
-    controller.alterClientQuotas(quotaRequest.entries(), quotaRequest.validateOnly())
+    controller.alterClientQuotas(quotaRequest.entries, quotaRequest.validateOnly)
       .whenComplete((results, exception) => {
         if (exception != null) {
           requestHelper.handleError(request, exception)
@@ -362,15 +517,15 @@ class ControllerApis(val requestChannel: RequestChannel,
     authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
     val configChanges = new util.HashMap[ConfigResource, util.Map[String, util.Map.Entry[AlterConfigOp.OpType, String]]]()
     alterConfigsRequest.data.resources.forEach { resource =>
-      val configResource = new ConfigResource(ConfigResource.Type.forId(resource.resourceType()), resource.resourceName())
+      val configResource = new ConfigResource(ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
       val altersByName = new util.HashMap[String, util.Map.Entry[AlterConfigOp.OpType, String]]()
       resource.configs.forEach { config =>
-        altersByName.put(config.name(), new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
-          AlterConfigOp.OpType.forId(config.configOperation()), config.value()))
+        altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
+          AlterConfigOp.OpType.forId(config.configOperation), config.value))
       }
       configChanges.put(configResource, altersByName)
     }
-    controller.incrementalAlterConfigs(configChanges, alterConfigsRequest.data().validateOnly())
+    controller.incrementalAlterConfigs(configChanges, alterConfigsRequest.data.validateOnly)
       .whenComplete((results, exception) => {
         if (exception != null) {
           requestHelper.handleError(request, exception)
diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java
new file mode 100644
index 0000000..d38cab5
--- /dev/null
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.controller.ResultOrError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public class MockController implements Controller {
+    private final static NotControllerException NOT_CONTROLLER_EXCEPTION =
+        new NotControllerException("This is not the correct controller for this cluster.");
+
+    public static class Builder {
+        private final Map<String, MockTopic> initialTopics = new HashMap<>();
+
+        public Builder newInitialTopic(String name, Uuid id) {
+            initialTopics.put(name, new MockTopic(name, id));
+            return this;
+        }
+
+        public MockController build() {
+            return new MockController(initialTopics.values());
+        }
+    }
+
+    private volatile boolean active = true;
+
+    private MockController(Collection<MockTopic> initialTopics) {
+        for (MockTopic topic : initialTopics) {
+            topics.put(topic.id, topic);
+            topicNameToId.put(topic.name, topic.id);
+        }
+    }
+
+    @Override
+    public CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<CreateTopicsResponseData> createTopics(CreateTopicsRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> unregisterBroker(int brokerId) {
+        throw new UnsupportedOperationException();
+    }
+
+    static class MockTopic {
+        private final String name;
+        private final Uuid id;
+
+        MockTopic(String name, Uuid id) {
+            this.name = name;
+            this.id = id;
+        }
+    }
+
+    private final Map<String, Uuid> topicNameToId = new HashMap<>();
+
+    private final Map<Uuid, MockTopic> topics = new HashMap<>();
+
+    @Override
+    synchronized public CompletableFuture<Map<String, ResultOrError<Uuid>>>
+            findTopicIds(Collection<String> topicNames) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>();
+        for (String topicName : topicNames) {
+            if (!topicNameToId.containsKey(topicName)) {
+                results.put(topicName, new ResultOrError<>(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+            } else {
+                results.put(topicName, new ResultOrError<>(topicNameToId.get(topicName)));
+            }
+        }
+        return CompletableFuture.completedFuture(results);
+    }
+
+    @Override
+    synchronized public CompletableFuture<Map<Uuid, ResultOrError<String>>>
+            findTopicNames(Collection<Uuid> topicIds) {
+        Map<Uuid, ResultOrError<String>> results = new HashMap<>();
+        for (Uuid topicId : topicIds) {
+            MockTopic topic = topics.get(topicId);
+            if (topic == null) {
+                results.put(topicId, new ResultOrError<>(new ApiError(Errors.UNKNOWN_TOPIC_ID)));
+            } else {
+                results.put(topicId, new ResultOrError<>(topic.name));
+            }
+        }
+        return CompletableFuture.completedFuture(results);
+    }
+
+    @Override
+    synchronized public CompletableFuture<Map<Uuid, ApiError>>
+            deleteTopics(Collection<Uuid> topicIds) {
+        if (!active) {
+            CompletableFuture<Map<Uuid, ApiError>> future = new CompletableFuture<>();
+            future.completeExceptionally(NOT_CONTROLLER_EXCEPTION);
+            return future;
+        }
+        Map<Uuid, ApiError> results = new HashMap<>();
+        for (Uuid topicId : topicIds) {
+            MockTopic topic = topics.remove(topicId);
+            if (topic == null) {
+                results.put(topicId, new ApiError(Errors.UNKNOWN_TOPIC_ID));
+            } else {
+                topicNameToId.remove(topic.name);
+                results.put(topicId, ApiError.NONE);
+            }
+        }
+        return CompletableFuture.completedFuture(results);
+    }
+
+    @Override
+    public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(Map<ConfigResource, Collection<String>> resources) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<FeatureMapAndEpoch> finalizedFeatures() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+            Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges,
+            boolean validateOnly) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
+            Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<BrokerHeartbeatReply>
+            processBrokerHeartbeat(BrokerHeartbeatRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<BrokerRegistrationReply>
+            registerBroker(BrokerRegistrationRequestData request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForReadyBrokers(int minBrokers) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<Map<ClientQuotaEntity, ApiError>>
+            alterClientQuotas(Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void beginShutdown() {
+        this.active = false;
+    }
+
+    public void setActive(boolean active) {
+        this.active = active;
+    }
+
+    @Override
+    public long curClaimEpoch() {
+        return active ? 1 : -1;
+    }
+
+    @Override
+    public void close() {
+        beginShutdown();
+    }
+}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 2f1c91e..8e400ed 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package unit.kafka.server
+package kafka.server
 
 import java.net.InetAddress
+import java.util
 import java.util.Properties
+import java.util.concurrent.ExecutionException
 
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager, SimpleApiVersionManager}
+import kafka.test.MockController
 import kafka.utils.MockTime
 import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.Uuid.ZERO_UUID
+import org.apache.kafka.common.errors.{InvalidRequestException, NotControllerException, TopicDeletionDisabledException}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
-import org.apache.kafka.common.message.BrokerRegistrationRequestData
+import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
+import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
+import org.apache.kafka.common.message.{BrokerRegistrationRequestData, DeleteTopicsRequestData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BrokerRegistrationRequest, BrokerRegistrationResponse, RequestContext, RequestHeader, RequestTestUtils}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.controller.Controller
@@ -56,7 +62,6 @@ class ControllerApisTest {
   private val clientControllerQuotaManager: ControllerMutationQuotaManager = mock(classOf[ControllerMutationQuotaManager])
   private val replicaQuotaManager: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
   private val raftManager: RaftManager[ApiMessageAndVersion] = mock(classOf[RaftManager[ApiMessageAndVersion]])
-  private val authorizer: Authorizer = mock(classOf[Authorizer])
 
   private val quotas = QuotaManagers(
     clientQuotaManager,
@@ -67,15 +72,15 @@ class ControllerApisTest {
     replicaQuotaManager,
     replicaQuotaManager,
     None)
-  private val controller: Controller = mock(classOf[Controller])
 
-  private def createControllerApis(): ControllerApis = {
-    val props = new Properties()
+  private def createControllerApis(authorizer: Option[Authorizer],
+                                   controller: Controller,
+                                   props: Properties = new Properties()): ControllerApis = {
     props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
     props.put(KafkaConfig.ProcessRolesProp, "controller")
     new ControllerApis(
       requestChannel,
-      Some(authorizer),
+      authorizer,
       quotas,
       time,
       Map.empty,
@@ -122,6 +127,7 @@ class ControllerApisTest {
     val request = buildRequest(brokerRegistrationRequest)
     val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
 
+    val authorizer = mock(classOf[Authorizer])
     when(authorizer.authorize(
       any(classOf[AuthorizableRequestContext]),
       any(classOf[java.util.List[Action]])
@@ -129,7 +135,7 @@ class ControllerApisTest {
       java.util.Collections.singletonList(AuthorizationResult.DENIED)
     )
 
-    createControllerApis().handle(request)
+    createControllerApis(Some(authorizer), mock(classOf[Controller])).handle(request)
     verify(requestChannel).sendResponse(
       ArgumentMatchers.eq(request),
       capturedResponse.capture(),
@@ -142,6 +148,197 @@ class ControllerApisTest {
       brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testDeleteTopicsByName(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val controller = new MockController.Builder().newInitialTopic("foo", fooId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData().setTopicNames(
+      util.Arrays.asList("foo", "bar", "quux", "quux"))
+    val expectedResponse = Set(new DeletableTopicResult().setName("quux").
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."),
+      new DeletableTopicResult().setName("bar").
+        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).
+        setErrorMessage("This server does not host this topic-partition."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId))
+    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      true,
+      _ => Set.empty,
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testDeleteTopicsById(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ")
+    val controller = new MockController.Builder().newInitialTopic("foo", fooId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(quuxId))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(quuxId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic id."),
+      new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()).
+        setErrorMessage("This server does not host this topic ID."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      true,
+      _ => Set.empty,
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testInvalidDeleteTopicsRequest(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val bazId = Uuid.fromString("YOS4oQ3UT9eSAZahN1ysSA")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).
+      newInitialTopic("bar", barId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("foo").setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(bazId))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Neither topic name nor id were specified."),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("You may not specify both topic name and topic id."),
+      new DeletableTopicResult().setName("bar").setTopicId(barId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("The provided topic name maps to an ID that was already supplied."),
+      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."),
+      new DeletableTopicResult().setName(null).setTopicId(bazId).
+        setErrorCode(Errors.INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic id."))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      names => names.toSet,
+      names => names.toSet).asScala.toSet)
+  }
+
+  @Test
+  def testNotAuthorizedToDeleteWithTopicExisting(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val bazId = Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")
+    val quuxId = Uuid.fromString("5URoQzW_RJiERVZXJgUVLg")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).
+      newInitialTopic("bar", barId).
+      newInitialTopic("baz", bazId).
+      newInitialTopic("quux", quuxId).build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    request.topics().add(new DeleteTopicState().setName("baz").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+    val response = Set(new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("baz").setTopicId(ZERO_UUID).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName("foo").setTopicId(fooId).
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
+    assertEquals(response, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      _ => Set("foo", "baz"),
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testNotAuthorizedToDeleteWithTopicNotExisting(): Unit = {
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val controller = new MockController.Builder().build()
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    val expectedResponse = Set(new DeletableTopicResult().setName("foo").
+        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code).
+        setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message),
+      new DeletableTopicResult().setName("bar").
+        setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code).
+        setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message),
+      new DeletableTopicResult().setName(null).setTopicId(barId).
+        setErrorCode(Errors.UNKNOWN_TOPIC_ID.code).
+        setErrorMessage(Errors.UNKNOWN_TOPIC_ID.message))
+    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+      ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+      false,
+      _ => Set("foo"),
+      _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testNotControllerErrorPreventsDeletingTopics(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).build()
+    controller.setActive(false)
+    val controllerApis = createControllerApis(None, controller)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
+    request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
+    assertEquals(classOf[NotControllerException], assertThrows(
+      classOf[ExecutionException], () => controllerApis.deleteTopics(request,
+        ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+        false,
+        _ => Set("foo", "bar"),
+        _ => Set("foo", "bar"))).getCause.getClass)
+  }
+
+  @Test
+  def testDeleteTopicsDisabled(): Unit = {
+    val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+    val controller = new MockController.Builder().
+      newInitialTopic("foo", fooId).build()
+    val props = new Properties()
+    props.put(KafkaConfig.DeleteTopicEnableProp, "false")
+    val controllerApis = createControllerApis(None, controller, props)
+    val request = new DeleteTopicsRequestData()
+    request.topics().add(new DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
+    assertThrows(classOf[TopicDeletionDisabledException], () => controllerApis.deleteTopics(request,
+        ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+        false,
+        _ => Set("foo", "bar"),
+        _ => Set("foo", "bar")))
+    assertThrows(classOf[InvalidRequestException], () => controllerApis.deleteTopics(request,
+        1,
+        false,
+        _ => Set("foo", "bar"),
+        _ => Set("foo", "bar")))
+  }
+
   @AfterEach
   def tearDown(): Unit = {
     quotas.shutdown()
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index 46148e7..9d54c20 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -214,6 +214,13 @@ public class BrokersToIsrs {
         }
     }
 
+    void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
+        Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
+        if (topicMap != null) {
+            topicMap.remove(topicId);
+        }
+    }
+
     private void add(int brokerId, Uuid topicId, int newPartition, boolean leader) {
         if (leader) {
             newPartition = newPartition | LEADER_FLAG;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 5bc82ec..dcfe92d46 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -303,7 +303,7 @@ public class ConfigurationControlManager {
      *
      * @param record            The ConfigRecord.
      */
-    void replay(ConfigRecord record) {
+    public void replay(ConfigRecord record) {
         Type type = Type.forId(record.resourceType());
         ConfigResource configResource = new ConfigResource(type, record.resourceName());
         TimelineHashMap<String, String> configs = configData.get(configResource);
@@ -364,4 +364,8 @@ public class ConfigurationControlManager {
         }
         return results;
     }
+
+    void deleteTopicConfigs(String name) {
+        configData.remove(new ConfigResource(Type.TOPIC, name));
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 1ce63e0..2639463 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.controller;
 
 import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.message.AlterIsrRequestData;
 import org.apache.kafka.common.message.AlterIsrResponseData;
@@ -70,6 +71,31 @@ public interface Controller extends AutoCloseable {
     CompletableFuture<Void> unregisterBroker(int brokerId);
 
     /**
+     * Find the ids for topic names.
+     *
+     * @param topicNames    The topic names to resolve.
+     * @return              A future yielding a map from topic name to id.
+     */
+    CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(Collection<String> topicNames);
+
+    /**
+     * Find the names for topic ids.
+     *
+     * @param topicIds      The topic ids to resolve.
+     * @return              A future yielding a map from topic id to name.
+     */
+    CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(Collection<Uuid> topicIds);
+
+    /**
+     * Delete a batch of topics.
+     *
+     * @param topicIds      The IDs of the topics to delete.
+     *
+     * @return              A future yielding the response.
+     */
+    CompletableFuture<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> topicIds);
+
+    /**
      * Describe the current configuration of various resources.
      *
      * @param resources     A map from resources to the collection of config keys that we
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 759db1e..6ee1b7e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
@@ -44,12 +45,14 @@ import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.ElectLeadersRequestData;
 import org.apache.kafka.common.message.ElectLeadersResponseData;
 import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.MetadataRecordType;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.QuotaRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
@@ -448,7 +451,7 @@ public final class QuorumController implements Controller {
                 writeOffset = offset;
                 resultAndOffset = ControllerResultAndOffset.of(offset, result);
                 for (ApiMessageAndVersion message : result.records()) {
-                    replay(message.message());
+                    replay(message.message(), offset);
                 }
                 snapshotRegistry.createSnapshot(offset);
                 log.debug("Read-write operation {} will be completed when the log " +
@@ -513,7 +516,7 @@ public final class QuorumController implements Controller {
                         }
                     }
                     for (ApiMessage message : messages) {
-                        replay(message);
+                        replay(message, offset);
                     }
                 } else {
                     // If the controller is active, the records were already replayed,
@@ -623,7 +626,7 @@ public final class QuorumController implements Controller {
     }
 
     @SuppressWarnings("unchecked")
-    private void replay(ApiMessage message) {
+    private void replay(ApiMessage message, long offset) {
         try {
             MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
             switch (type) {
@@ -633,12 +636,6 @@ public final class QuorumController implements Controller {
                 case UNREGISTER_BROKER_RECORD:
                     clusterControl.replay((UnregisterBrokerRecord) message);
                     break;
-                case FENCE_BROKER_RECORD:
-                    clusterControl.replay((FenceBrokerRecord) message);
-                    break;
-                case UNFENCE_BROKER_RECORD:
-                    clusterControl.replay((UnfenceBrokerRecord) message);
-                    break;
                 case TOPIC_RECORD:
                     replicationControl.replay((TopicRecord) message);
                     break;
@@ -648,12 +645,24 @@ public final class QuorumController implements Controller {
                 case CONFIG_RECORD:
                     configurationControl.replay((ConfigRecord) message);
                     break;
-                case QUOTA_RECORD:
-                    clientQuotaControlManager.replay((QuotaRecord) message);
-                    break;
                 case PARTITION_CHANGE_RECORD:
                     replicationControl.replay((PartitionChangeRecord) message);
                     break;
+                case FENCE_BROKER_RECORD:
+                    clusterControl.replay((FenceBrokerRecord) message);
+                    break;
+                case UNFENCE_BROKER_RECORD:
+                    clusterControl.replay((UnfenceBrokerRecord) message);
+                    break;
+                case REMOVE_TOPIC_RECORD:
+                    replicationControl.replay((RemoveTopicRecord) message);
+                    break;
+                case FEATURE_LEVEL_RECORD:
+                    featureControl.replay((FeatureLevelRecord) message, offset);
+                    break;
+                case QUOTA_RECORD:
+                    clientQuotaControlManager.replay((QuotaRecord) message);
+                    break;
                 default:
                     throw new RuntimeException("Unhandled record type " + type);
             }
@@ -793,6 +802,9 @@ public final class QuorumController implements Controller {
 
     @Override
     public CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request) {
+        if (request.topics().isEmpty()) {
+            return CompletableFuture.completedFuture(new AlterIsrResponseData());
+        }
         return appendWriteEvent("alterIsr", () ->
             replicationControl.alterIsr(request));
     }
@@ -800,6 +812,9 @@ public final class QuorumController implements Controller {
     @Override
     public CompletableFuture<CreateTopicsResponseData>
             createTopics(CreateTopicsRequestData request) {
+        if (request.topics().isEmpty()) {
+            return CompletableFuture.completedFuture(new CreateTopicsResponseData());
+        }
         return appendWriteEvent("createTopics", () ->
             replicationControl.createTopics(request));
     }
@@ -811,6 +826,27 @@ public final class QuorumController implements Controller {
     }
 
     @Override
+    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(Collection<String> names) {
+        if (names.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendReadEvent("findTopicIds",
+            () -> replicationControl.findTopicIds(lastCommittedOffset, names));
+    }
+
+    @Override
+    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(Collection<Uuid> ids) {
+        if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendReadEvent("findTopicNames",
+            () -> replicationControl.findTopicNames(lastCommittedOffset, ids));
+    }
+
+    @Override
+    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+        if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendWriteEvent("deleteTopics",
+            () -> replicationControl.deleteTopics(ids));
+    }
+
+    @Override
     public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>>
             describeConfigs(Map<ConfigResource, Collection<String>> resources) {
         return appendReadEvent("describeConfigs", () ->
@@ -847,7 +883,10 @@ public final class QuorumController implements Controller {
 
     @Override
     public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
-        Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
+            Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
+        if (newConfigs.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        }
         return appendWriteEvent("legacyAlterConfigs", () -> {
             ControllerResult<Map<ConfigResource, ApiError>> result =
                 configurationControl.legacyAlterConfigs(newConfigs);
@@ -901,6 +940,9 @@ public final class QuorumController implements Controller {
     @Override
     public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
             Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly) {
+        if (quotaAlterations.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        }
         return appendWriteEvent("alterClientQuotas", () -> {
             ControllerResult<Map<ClientQuotaEntity, ApiError>> result =
                 clientQuotaControlManager.alterClientQuotas(quotaAlterations);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index ca57105..59798c4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -21,28 +21,31 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.UnknownTopicIdException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.AlterIsrRequestData;
 import org.apache.kafka.common.message.AlterIsrResponseData;
 import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
-import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
-import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
 import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
 import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
-import org.apache.kafka.common.message.ElectLeadersResponseData;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
@@ -60,6 +63,7 @@ import org.slf4j.Logger;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -72,6 +76,9 @@ import java.util.Random;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
+import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
 
 
 /**
@@ -92,10 +99,12 @@ public class ReplicationControlManager {
     public static final int NO_LEADER_CHANGE = -2;
 
     static class TopicControlInfo {
+        private final String name;
         private final Uuid id;
         private final TimelineHashMap<Integer, PartitionControlInfo> parts;
 
-        TopicControlInfo(SnapshotRegistry snapshotRegistry, Uuid id) {
+        TopicControlInfo(String name, SnapshotRegistry snapshotRegistry, Uuid id) {
+            this.name = name;
             this.id = id;
             this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
         }
@@ -261,7 +270,7 @@ public class ReplicationControlManager {
     /**
      * A reference to the controller's cluster control manager.
      */
-    final ClusterControlManager clusterControl;
+    private final ClusterControlManager clusterControl;
 
     /**
      * Maps topic names to topic UUIDs.
@@ -299,7 +308,8 @@ public class ReplicationControlManager {
 
     public void replay(TopicRecord record) {
         topicsByName.put(record.name(), record.topicId());
-        topics.put(record.topicId(), new TopicControlInfo(snapshotRegistry, record.topicId()));
+        topics.put(record.topicId(),
+            new TopicControlInfo(record.name(), snapshotRegistry, record.topicId()));
         log.info("Created topic {} with ID {}.", record.name(), record.topicId());
     }
 
@@ -349,6 +359,29 @@ public class ReplicationControlManager {
         log.debug("Applied ISR change record: {}", record.toString());
     }
 
+    public void replay(RemoveTopicRecord record) {
+        // Remove this topic from the topics map and the topicsByName map.
+        TopicControlInfo topic = topics.remove(record.topicId());
+        if (topic == null) {
+            throw new UnknownTopicIdException("Can't find topic with ID " + record.topicId() +
+                " to remove.");
+        }
+        topicsByName.remove(topic.name);
+
+        // Delete the configurations associated with this topic.
+        configurationControl.deleteTopicConfigs(topic.name);
+
+        // Remove the entries for this topic in brokersToIsrs.
+        for (PartitionControlInfo partition : topic.parts.values()) {
+            for (int i = 0; i < partition.isr.length; i++) {
+                brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
+            }
+        }
+        brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
+
+        log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
+    }
+
     ControllerResult<CreateTopicsResponseData>
             createTopics(CreateTopicsRequestData request) {
         Map<String, ApiError> topicErrors = new HashMap<>();
@@ -416,12 +449,12 @@ public class ReplicationControlManager {
         Map<Integer, PartitionControlInfo> newParts = new HashMap<>();
         if (!topic.assignments().isEmpty()) {
             if (topic.replicationFactor() != -1) {
-                return new ApiError(Errors.INVALID_REQUEST,
+                return new ApiError(INVALID_REQUEST,
                     "A manual partition assignment was specified, but replication " +
                     "factor was not set to -1.");
             }
             if (topic.numPartitions() != -1) {
-                return new ApiError(Errors.INVALID_REQUEST,
+                return new ApiError(INVALID_REQUEST,
                     "A manual partition assignment was specified, but numPartitions " +
                         "was not set to -1.");
             }
@@ -458,7 +491,7 @@ public class ReplicationControlManager {
             return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
                 "Replication factor was set to an invalid non-positive value.");
         } else if (!topic.assignments().isEmpty()) {
-            return new ApiError(Errors.INVALID_REQUEST,
+            return new ApiError(INVALID_REQUEST,
                 "Replication factor was not set to -1 but a manual partition " +
                     "assignment was specified.");
         } else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) {
@@ -541,6 +574,68 @@ public class ReplicationControlManager {
         return configChanges;
     }
 
+    Map<String, ResultOrError<Uuid>> findTopicIds(long offset, Collection<String> names) {
+        Map<String, ResultOrError<Uuid>> results = new HashMap<>(names.size());
+        for (String name : names) {
+            if (name == null) {
+                results.put(null, new ResultOrError<>(INVALID_REQUEST, "Invalid null topic name."));
+            } else {
+                Uuid id = topicsByName.get(name, offset);
+                if (id == null) {
+                    results.put(name, new ResultOrError<>(
+                        new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
+                } else {
+                    results.put(name, new ResultOrError<>(id));
+                }
+            }
+        }
+        return results;
+    }
+
+    Map<Uuid, ResultOrError<String>> findTopicNames(long offset, Collection<Uuid> ids) {
+        Map<Uuid, ResultOrError<String>> results = new HashMap<>(ids.size());
+        for (Uuid id : ids) {
+            if (id == null || id.equals(Uuid.ZERO_UUID)) {
+                results.put(id, new ResultOrError<>(new ApiError(INVALID_REQUEST,
+                    "Attempt to find topic with invalid topicId " + id)));
+            } else {
+                TopicControlInfo topic = topics.get(id, offset);
+                if (topic == null) {
+                    results.put(id, new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID)));
+                } else {
+                    results.put(id, new ResultOrError<>(topic.name));
+                }
+            }
+        }
+        return results;
+    }
+
+    ControllerResult<Map<Uuid, ApiError>> deleteTopics(Collection<Uuid> ids) {
+        Map<Uuid, ApiError> results = new HashMap<>(ids.size());
+        List<ApiMessageAndVersion> records = new ArrayList<>(ids.size());
+        for (Uuid id : ids) {
+            try {
+                deleteTopic(id, records);
+                results.put(id, ApiError.NONE);
+            } catch (ApiException e) {
+                results.put(id, ApiError.fromThrowable(e));
+            } catch (Exception e) {
+                log.error("Unexpected deleteTopics error for {}", id, e);
+                results.put(id, ApiError.fromThrowable(e));
+            }
+        }
+        return new ControllerResult<>(records, results);
+    }
+
+    void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {
+        TopicControlInfo topic = topics.get(id);
+        if (topic == null) {
+            throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
+        }
+        records.add(new ApiMessageAndVersion(new RemoveTopicRecord().
+            setTopicId(id), (short) 0));
+    }
+
     // VisibleForTesting
     PartitionControlInfo getPartition(Uuid topicId, int partitionId) {
         TopicControlInfo topic = topics.get(topicId);
@@ -568,7 +663,7 @@ public class ReplicationControlManager {
                 for (AlterIsrRequestData.PartitionData partitionData : topicData.partitions()) {
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
+                        setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
                 }
                 continue;
             }
@@ -578,13 +673,13 @@ public class ReplicationControlManager {
                 if (partition == null) {
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
+                        setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
                     continue;
                 }
                 if (request.brokerId() != partition.leader) {
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(Errors.INVALID_REQUEST.code()));
+                        setErrorCode(INVALID_REQUEST.code()));
                     continue;
                 }
                 if (partitionData.leaderEpoch() != partition.leaderEpoch) {
@@ -603,14 +698,14 @@ public class ReplicationControlManager {
                 if (!Replicas.validateIsr(partition.replicas, newIsr)) {
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(Errors.INVALID_REQUEST.code()));
+                        setErrorCode(INVALID_REQUEST.code()));
                     continue;
                 }
                 if (!Replicas.contains(newIsr, partition.leader)) {
                     // An alterIsr request can't remove the current leader.
                     responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(Errors.INVALID_REQUEST.code()));
+                        setErrorCode(INVALID_REQUEST.code()));
                     continue;
                 }
                 records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
@@ -797,17 +892,17 @@ public class ReplicationControlManager {
                          List<ApiMessageAndVersion> records) {
         Uuid topicId = topicsByName.get(topic);
         if (topicId == null) {
-            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                 "No such topic as " + topic);
         }
         TopicControlInfo topicInfo = topics.get(topicId);
         if (topicInfo == null) {
-            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                 "No such topic id as " + topicId);
         }
         PartitionControlInfo partitionInfo = topicInfo.parts.get(partitionId);
         if (partitionInfo == null) {
-            return new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+            return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                 "No such partition as " + topic + "-" + partitionId);
         }
         int newLeader = bestLeader(partitionInfo.replicas, partitionInfo.isr, unclean);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
index 6d910e4..2fedacd 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.requests.ApiError;
 import java.util.Objects;
 
 
-class ResultOrError<T> {
+public class ResultOrError<T> {
     private final ApiError error;
     private final T result;
 
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index a8ea94d..2f5d7be 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -348,7 +348,7 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
         return prev;
     }
 
-    Object snapshottableRemove(Object object) {
+    T snapshottableRemove(Object object) {
         T prev = baseRemove(object);
         if (prev == null) {
             return null;
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
index bbec158..6e02517 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineHashMap.java
@@ -164,9 +164,10 @@ public class TimelineHashMap<K, V>
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public V remove(Object key) {
-        return (V) snapshottableRemove(new TimelineHashMapEntry<>(key, null));
+        TimelineHashMapEntry<K, V> result = snapshottableRemove(
+            new TimelineHashMapEntry<>(key, null));
+        return result == null ? null : result.value;
     }
 
     @Override
@@ -342,7 +343,7 @@ public class TimelineHashMap<K, V>
             if (epoch != SnapshottableHashTable.LATEST_EPOCH) {
                 throw new RuntimeException("can't modify snapshot");
             }
-            return snapshottableRemove(o) != null;
+            return snapshottableRemove(new TimelineHashMapEntry<>(o, null)) != null;
         }
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index a639614..da6e4af 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.controller;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.message.AlterIsrRequestData;
 import org.apache.kafka.common.message.AlterIsrResponseData;
@@ -55,37 +57,50 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
+import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
 import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
 import static org.apache.kafka.controller.ReplicationControlManager.PartitionControlInfo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 
 @Timeout(40)
 public class ReplicationControlManagerTest {
-    private static ReplicationControlManager newReplicationControlManager() {
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        LogContext logContext = new LogContext();
-        MockTime time = new MockTime();
-        MockRandom random = new MockRandom();
-        ClusterControlManager clusterControl = new ClusterControlManager(
+    private static class ReplicationControlTestContext {
+        final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        final LogContext logContext = new LogContext();
+        final MockTime time = new MockTime();
+        final MockRandom random = new MockRandom();
+        final ClusterControlManager clusterControl = new ClusterControlManager(
             logContext, time, snapshotRegistry, 1000,
             new SimpleReplicaPlacementPolicy(random));
-        clusterControl.activate();
-        ConfigurationControlManager configurationControl = new ConfigurationControlManager(
+        final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
             new LogContext(), snapshotRegistry, Collections.emptyMap());
-        return new ReplicationControlManager(snapshotRegistry,
+        final ReplicationControlManager replicationControl = new ReplicationControlManager(snapshotRegistry,
             new LogContext(),
             random,
             (short) 3,
             1,
             configurationControl,
             clusterControl);
+
+        void replay(List<ApiMessageAndVersion> records) throws Exception {
+            ControllerTestUtils.replayAll(clusterControl, records);
+            ControllerTestUtils.replayAll(configurationControl, records);
+            ControllerTestUtils.replayAll(replicationControl, records);
+        }
+
+        ReplicationControlTestContext() {
+            clusterControl.activate();
+        }
     }
 
-    private static void registerBroker(int brokerId, ReplicationControlManager replicationControl) {
+    private static void registerBroker(int brokerId, ReplicationControlTestContext ctx) {
         RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
             setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
         brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
@@ -93,22 +108,23 @@ public class ReplicationControlManagerTest {
             setPort((short) 9092 + brokerId).
             setName("PLAINTEXT").
             setHost("localhost"));
-        replicationControl.clusterControl.replay(brokerRecord);
+        ctx.clusterControl.replay(brokerRecord);
     }
 
     private static void unfenceBroker(int brokerId,
-                                      ReplicationControlManager replicationControl) throws Exception {
-        ControllerResult<BrokerHeartbeatReply> result = replicationControl.
+                                      ReplicationControlTestContext ctx) throws Exception {
+        ControllerResult<BrokerHeartbeatReply> result = ctx.replicationControl.
             processBrokerHeartbeat(new BrokerHeartbeatRequestData().
                 setBrokerId(brokerId).setBrokerEpoch(brokerId + 100).setCurrentMetadataOffset(1).
                 setWantFence(false).setWantShutDown(false), 0);
         assertEquals(new BrokerHeartbeatReply(true, false, false, false), result.response());
-        ControllerTestUtils.replayAll(replicationControl.clusterControl, result.records());
+        ControllerTestUtils.replayAll(ctx.clusterControl, result.records());
     }
 
     @Test
     public void testCreateTopics() throws Exception {
-        ReplicationControlManager replicationControl = newReplicationControlManager();
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
         CreateTopicsRequestData request = new CreateTopicsRequestData();
         request.topics().add(new CreatableTopic().setName("foo").
             setNumPartitions(-1).setReplicationFactor((short) -1));
@@ -120,12 +136,12 @@ public class ReplicationControlManagerTest {
                 setErrorMessage("Unable to replicate the partition 3 times: there are only 0 usable brokers"));
         assertEquals(expectedResponse, result.response());
 
-        registerBroker(0, replicationControl);
-        unfenceBroker(0, replicationControl);
-        registerBroker(1, replicationControl);
-        unfenceBroker(1, replicationControl);
-        registerBroker(2, replicationControl);
-        unfenceBroker(2, replicationControl);
+        registerBroker(0, ctx);
+        unfenceBroker(0, ctx);
+        registerBroker(1, ctx);
+        unfenceBroker(1, ctx);
+        registerBroker(2, ctx);
+        unfenceBroker(2, ctx);
         ControllerResult<CreateTopicsResponseData> result2 =
             replicationControl.createTopics(request);
         CreateTopicsResponseData expectedResponse2 = new CreateTopicsResponseData();
@@ -189,10 +205,11 @@ public class ReplicationControlManagerTest {
 
     @Test
     public void testRemoveLeaderships() throws Exception {
-        ReplicationControlManager replicationControl = newReplicationControlManager();
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
         for (int i = 0; i < 6; i++) {
-            registerBroker(i, replicationControl);
-            unfenceBroker(i, replicationControl);
+            registerBroker(i, ctx);
+            unfenceBroker(i, ctx);
         }
         CreatableTopicResult result = createTestTopic(replicationControl, "foo",
             new int[][] {
@@ -215,10 +232,11 @@ public class ReplicationControlManagerTest {
 
     @Test
     public void testShrinkAndExpandIsr() throws Exception {
-        ReplicationControlManager replicationControl = newReplicationControlManager();
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
         for (int i = 0; i < 3; i++) {
-            registerBroker(i, replicationControl);
-            unfenceBroker(i, replicationControl);
+            registerBroker(i, ctx);
+            unfenceBroker(i, ctx);
         }
         CreatableTopicResult createTopicResult = createTestTopic(replicationControl, "foo",
             new int[][] {new int[] {0, 1, 2}});
@@ -226,13 +244,13 @@ public class ReplicationControlManagerTest {
         TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
         TopicPartition topicPartition = new TopicPartition("foo", 0);
         assertEquals(OptionalInt.of(0), currentLeader(replicationControl, topicIdPartition));
-        long brokerEpoch = currentBrokerEpoch(replicationControl, 0);
+        long brokerEpoch = currentBrokerEpoch(ctx, 0);
         AlterIsrRequestData.PartitionData shrinkIsrRequest = newAlterIsrPartition(
             replicationControl, topicIdPartition, Arrays.asList(0, 1));
         ControllerResult<AlterIsrResponseData> shrinkIsrResult = sendAlterIsr(
             replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
         AlterIsrResponseData.PartitionData shrinkIsrResponse = assertAlterIsrResponse(
-            shrinkIsrResult, topicPartition, Errors.NONE);
+            shrinkIsrResult, topicPartition, NONE);
         assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
 
         AlterIsrRequestData.PartitionData expandIsrRequest = newAlterIsrPartition(
@@ -240,16 +258,17 @@ public class ReplicationControlManagerTest {
         ControllerResult<AlterIsrResponseData> expandIsrResult = sendAlterIsr(
             replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
         AlterIsrResponseData.PartitionData expandIsrResponse = assertAlterIsrResponse(
-            expandIsrResult, topicPartition, Errors.NONE);
+            expandIsrResult, topicPartition, NONE);
         assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, expandIsrResponse);
     }
 
     @Test
     public void testInvalidAlterIsrRequests() throws Exception {
-        ReplicationControlManager replicationControl = newReplicationControlManager();
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
         for (int i = 0; i < 3; i++) {
-            registerBroker(i, replicationControl);
-            unfenceBroker(i, replicationControl);
+            registerBroker(i, ctx);
+            unfenceBroker(i, ctx);
         }
         CreatableTopicResult createTopicResult = createTestTopic(replicationControl, "foo",
             new int[][] {new int[] {0, 1, 2}});
@@ -257,13 +276,13 @@ public class ReplicationControlManagerTest {
         TopicIdPartition topicIdPartition = new TopicIdPartition(createTopicResult.topicId(), 0);
         TopicPartition topicPartition = new TopicPartition("foo", 0);
         assertEquals(OptionalInt.of(0), currentLeader(replicationControl, topicIdPartition));
-        long brokerEpoch = currentBrokerEpoch(replicationControl, 0);
+        long brokerEpoch = currentBrokerEpoch(ctx, 0);
 
         // Invalid leader
         AlterIsrRequestData.PartitionData invalidLeaderRequest = newAlterIsrPartition(
             replicationControl, topicIdPartition, Arrays.asList(0, 1));
         ControllerResult<AlterIsrResponseData> invalidLeaderResult = sendAlterIsr(
-            replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+            replicationControl, 1, currentBrokerEpoch(ctx, 1),
             "foo", invalidLeaderRequest);
         assertAlterIsrResponse(invalidLeaderResult, topicPartition, Errors.INVALID_REQUEST);
 
@@ -278,7 +297,7 @@ public class ReplicationControlManagerTest {
             replicationControl, topicIdPartition, Arrays.asList(0, 1));
         invalidLeaderEpochRequest.setLeaderEpoch(500);
         ControllerResult<AlterIsrResponseData> invalidLeaderEpochResult = sendAlterIsr(
-            replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+            replicationControl, 1, currentBrokerEpoch(ctx, 1),
             "foo", invalidLeaderEpochRequest);
         assertAlterIsrResponse(invalidLeaderEpochResult, topicPartition, Errors.INVALID_REQUEST);
 
@@ -287,7 +306,7 @@ public class ReplicationControlManagerTest {
             replicationControl, topicIdPartition, Arrays.asList(0, 1));
         invalidIsrRequest1.setNewIsr(Arrays.asList(0, 1, 3));
         ControllerResult<AlterIsrResponseData> invalidIsrResult1 = sendAlterIsr(
-            replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+            replicationControl, 1, currentBrokerEpoch(ctx, 1),
             "foo", invalidIsrRequest1);
         assertAlterIsrResponse(invalidIsrResult1, topicPartition, Errors.INVALID_REQUEST);
 
@@ -296,16 +315,16 @@ public class ReplicationControlManagerTest {
             replicationControl, topicIdPartition, Arrays.asList(0, 1));
         invalidIsrRequest2.setNewIsr(Arrays.asList(1, 2));
         ControllerResult<AlterIsrResponseData> invalidIsrResult2 = sendAlterIsr(
-            replicationControl, 1, currentBrokerEpoch(replicationControl, 1),
+            replicationControl, 1, currentBrokerEpoch(ctx, 1),
             "foo", invalidIsrRequest2);
         assertAlterIsrResponse(invalidIsrResult2, topicPartition, Errors.INVALID_REQUEST);
     }
 
     private long currentBrokerEpoch(
-        ReplicationControlManager replicationControl,
+        ReplicationControlTestContext ctx,
         int brokerId
     ) {
-        Map<Integer, BrokerRegistration> registrations = replicationControl.clusterControl.brokerRegistrations();
+        Map<Integer, BrokerRegistration> registrations = ctx.clusterControl.brokerRegistrations();
         BrokerRegistration registration = registrations.get(brokerId);
         assertNotNull(registration, "No current registration for broker " + brokerId);
         return registration.epoch();
@@ -391,4 +410,100 @@ public class ReplicationControlManagerTest {
         assertEquals(expectedIsr, partitionData.isr());
     }
 
+    private void assertCreatedTopicConfigs(
+        ReplicationControlTestContext ctx,
+        String topic,
+        CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs
+    ) {
+        Map<String, String> configs = ctx.configurationControl.getConfigs(
+            new ConfigResource(ConfigResource.Type.TOPIC, topic));
+        assertEquals(requestConfigs.size(), configs.size());
+        for (CreateTopicsRequestData.CreateableTopicConfig requestConfig : requestConfigs) {
+            String value = configs.get(requestConfig.name());
+            assertEquals(requestConfig.value(), value);
+        }
+    }
+
+    private void assertEmptyTopicConfigs(
+        ReplicationControlTestContext ctx,
+        String topic
+    ) {
+        Map<String, String> configs = ctx.configurationControl.getConfigs(
+            new ConfigResource(ConfigResource.Type.TOPIC, topic));
+        assertEquals(Collections.emptyMap(), configs);
+    }
+
+    @Test
+    public void testDeleteTopics() throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        CreateTopicsRequestData.CreateableTopicConfigCollection requestConfigs =
+            new CreateTopicsRequestData.CreateableTopicConfigCollection();
+        requestConfigs.add(new CreateTopicsRequestData.CreateableTopicConfig().
+            setName("cleanup.policy").setValue("compact"));
+        requestConfigs.add(new CreateTopicsRequestData.CreateableTopicConfig().
+            setName("min.cleanable.dirty.ratio").setValue("0.1"));
+        request.topics().add(new CreatableTopic().setName("foo").
+            setNumPartitions(3).setReplicationFactor((short) 2).
+            setConfigs(requestConfigs));
+        registerBroker(0, ctx);
+        unfenceBroker(0, ctx);
+        registerBroker(1, ctx);
+        unfenceBroker(1, ctx);
+        ControllerResult<CreateTopicsResponseData> result =
+            replicationControl.createTopics(request);
+        CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
+        Uuid topicId = result.response().topics().find("foo").topicId();
+        expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
+            setNumPartitions(3).setReplicationFactor((short) 2).
+            setErrorMessage(null).setErrorCode((short) 0).
+            setTopicId(topicId));
+        assertEquals(expectedResponse, result.response());
+        // Until the records are replayed, no changes are made
+        assertNull(replicationControl.getPartition(topicId, 0));
+        assertEmptyTopicConfigs(ctx, "foo");
+        ctx.replay(result.records());
+        assertNotNull(replicationControl.getPartition(topicId, 0));
+        assertNotNull(replicationControl.getPartition(topicId, 1));
+        assertNotNull(replicationControl.getPartition(topicId, 2));
+        assertNull(replicationControl.getPartition(topicId, 3));
+        assertCreatedTopicConfigs(ctx, "foo", requestConfigs);
+
+        assertEquals(Collections.singletonMap(topicId, new ResultOrError<>("foo")),
+            replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
+        assertEquals(Collections.singletonMap("foo", new ResultOrError<>(topicId)),
+            replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
+        Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1,
+            topicId.getLeastSignificantBits());
+        assertEquals(Collections.singletonMap(invalidId,
+            new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID))),
+                replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(invalidId)));
+        assertEquals(Collections.singletonMap("bar",
+            new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
+                replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
+
+        ControllerResult<Map<Uuid, ApiError>> result1 = replicationControl.
+            deleteTopics(Collections.singletonList(invalidId));
+        assertEquals(0, result1.records().size());
+        assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
+            result1.response());
+        ControllerResult<Map<Uuid, ApiError>> result2 = replicationControl.
+            deleteTopics(Collections.singletonList(topicId));
+        assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)),
+            result2.response());
+        assertEquals(1, result2.records().size());
+        ctx.replay(result2.records());
+        assertNull(replicationControl.getPartition(topicId, 0));
+        assertNull(replicationControl.getPartition(topicId, 1));
+        assertNull(replicationControl.getPartition(topicId, 2));
+        assertNull(replicationControl.getPartition(topicId, 3));
+        assertEquals(Collections.singletonMap(topicId, new ResultOrError<>(
+            new ApiError(UNKNOWN_TOPIC_ID))), replicationControl.findTopicNames(
+                Long.MAX_VALUE, Collections.singleton(topicId)));
+        assertEquals(Collections.singletonMap("foo", new ResultOrError<>(
+            new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), replicationControl.findTopicIds(
+                Long.MAX_VALUE, Collections.singleton("foo")));
+        assertEmptyTopicConfigs(ctx, "foo");
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java b/metadata/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java
index bb5fe9d..19edceb 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java
@@ -96,6 +96,8 @@ public class TimelineHashMapTest {
         assertEquals("xyz", map.putIfAbsent(1, "ghi"));
         map.putAll(Collections.singletonMap(2, "b"));
         assertTrue(map.containsKey(2));
+        assertEquals("xyz", map.remove(1));
+        assertEquals("b", map.remove(2));
     }
 
     @Test