You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/04/15 23:07:32 UTC

[kafka] branch trunk updated: KAFKA-13807: Fix incrementalAlterConfig and refactor some things (#12033)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1521813a3a KAFKA-13807: Fix incrementalAlterConfig and refactor some things (#12033)
1521813a3a is described below

commit 1521813a3a9710011b1ac7526ad3379f809032ca
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri Apr 15 16:07:23 2022 -0700

    KAFKA-13807: Fix incrementalAlterConfig and refactor some things (#12033)
    
    Ensure that we can set log.flush.interval.ms at the broker or cluster level via
    IncrementalAlterConfigs. This was broken by KAFKA-13749, which added log.flush.interval.ms as the
    second synonym rather than the first. Add a regression test to DynamicConfigChangeTest.
    
    Create ControllerRequestContext and pass it to every controller API. This gives us a uniform way to
    pass through information like the deadline (if there is one) and the Kafka principal which is
    making the request (in the future we will want to log this information).
    
    In ControllerApis, enforce a timeout for broker heartbeat requests which is equal to the heartbeat
    request interval, to avoid heartbeats piling up on the controller queue. This should have been done
    previously, but we overlooked it.
    
    Add a builder for ClusterControlManager and ReplicationControlManager to avoid the need to deal
    with a lot of churn (especially in test code) whenever a new constructor parameter gets added for
    one of these.
    
    In ControllerConfigurationValidator, create a separate function for when we just want to validate
    that a ConfigResource is a valid target for DescribeConfigs. Previously we had been re-using the
    validation code for IncrementalAlterConfigs, but this was messy.
    
    Split out the replica placement code into a separate package and reorganize it a bit.
    
    Reviewers: David Arthur <mumrah@gmail.com
---
 checkstyle/import-control.xml                      |   1 +
 core/src/main/scala/kafka/log/LogConfig.scala      |   4 +-
 .../main/scala/kafka/server/ControllerApis.scala   | 105 +++++---
 .../server/ControllerConfigurationValidator.scala  |  71 ++++--
 core/src/test/java/kafka/test/MockController.java  | 122 +++++++---
 .../kafka/api/AuthorizerIntegrationTest.scala      |  13 +-
 .../kafka/integration/KafkaServerTestHarness.scala |   7 +-
 .../unit/kafka/server/ControllerApisTest.scala     |  30 ++-
 .../ControllerConfigurationValidatorTest.scala     |  10 +-
 .../kafka/server/DynamicConfigChangeTest.scala     |   8 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  17 +-
 .../kafka/controller/BrokerHeartbeatManager.java   |  30 +--
 .../kafka/controller/ClusterControlManager.java    | 106 +++++++-
 .../controller/ConfigurationControlManager.java    |   6 +-
 .../kafka/controller/ConfigurationValidator.java   |  15 +-
 .../org/apache/kafka/controller/Controller.java    | 138 ++++++++---
 .../kafka/controller/ControllerRequestContext.java |  60 +++++
 .../apache/kafka/controller/QuorumController.java  | 270 +++++++++++++--------
 .../controller/ReplicationControlManager.java      | 140 +++++++++--
 .../kafka/metadata/authorizer/AclMutator.java      |  14 +-
 .../authorizer/ClusterMetadataAuthorizer.java      |  10 +-
 .../metadata/placement/ClusterDescriber.java}      |  22 +-
 .../kafka/metadata/placement/PlacementSpec.java    |  83 +++++++
 .../placement}/ReplicaPlacer.java                  |  25 +-
 .../placement}/StripedReplicaPlacer.java           |  24 +-
 .../metadata/{ => placement}/UsableBroker.java     |  15 +-
 .../controller/BrokerHeartbeatManagerTest.java     |   2 +-
 .../controller/ClusterControlManagerTest.java      |  66 +++--
 .../controller/ProducerIdControlManagerTest.java   |  16 +-
 .../kafka/controller/QuorumControllerTest.java     | 132 +++++-----
 .../controller/ReplicationControlManagerTest.java  |  37 +--
 .../kafka/{controller => metadata}/MockRandom.java |   2 +-
 .../authorizer/ClusterMetadataAuthorizerTest.java  |  11 +-
 .../placement}/StripedReplicaPlacerTest.java       |  52 ++--
 34 files changed, 1167 insertions(+), 497 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 070d22c14c..414e59a614 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -271,6 +271,7 @@
       <allow pkg="org.apache.kafka.common.acl" />
       <allow pkg="org.apache.kafka.common.requests" />
       <allow pkg="org.apache.kafka.common.resource" />
+      <allow pkg="org.apache.kafka.controller" />
       <allow pkg="org.apache.kafka.metadata" />
     </subpackage>
   </subpackage>
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 6e3cf459c3..e0a2cb463e 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -473,8 +473,8 @@ object LogConfig {
     FlushMessagesProp -> asList(
       new ConfigSynonym(KafkaConfig.LogFlushIntervalMessagesProp)),
     FlushMsProp -> asList(
-      new ConfigSynonym(KafkaConfig.LogFlushSchedulerIntervalMsProp),
-      new ConfigSynonym(KafkaConfig.LogFlushIntervalMsProp)),
+      new ConfigSynonym(KafkaConfig.LogFlushIntervalMsProp),
+      new ConfigSynonym(KafkaConfig.LogFlushSchedulerIntervalMsProp)),
     RetentionBytesProp -> asList(
       new ConfigSynonym(KafkaConfig.LogRetentionBytesProp)),
     RetentionMsProp -> asList(
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index e4b40bc603..064974b7db 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -18,9 +18,8 @@
 package kafka.server
 
 import java.util
-import java.util.Collections
+import java.util.{Collections, OptionalLong}
 import java.util.Map.Entry
-import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
 import java.util.concurrent.{CompletableFuture, ExecutionException}
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
@@ -46,7 +45,8 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, Uuid}
-import org.apache.kafka.controller.Controller
+import org.apache.kafka.controller.ControllerRequestContext.requestTimeoutMsToDeadlineNs
+import org.apache.kafka.controller.{Controller, ControllerRequestContext}
 import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
@@ -153,7 +153,10 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleDeleteTopics(request: RequestChannel.Request): Unit = {
     val deleteTopicsRequest = request.body[DeleteTopicsRequest]
-    val future = deleteTopics(deleteTopicsRequest.data,
+    val context = new ControllerRequestContext(request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, deleteTopicsRequest.data().timeoutMs()))
+    val future = deleteTopics(context,
+      deleteTopicsRequest.data,
       request.context.apiVersion,
       authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
       names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, names)(n => n),
@@ -172,12 +175,14 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def deleteTopics(request: DeleteTopicsRequestData,
-                   apiVersion: Int,
-                   hasClusterAuth: Boolean,
-                   getDescribableTopics: Iterable[String] => Set[String],
-                   getDeletableTopics: Iterable[String] => Set[String])
-                   : CompletableFuture[util.List[DeletableTopicResult]] = {
+  def deleteTopics(
+    context: ControllerRequestContext,
+    request: DeleteTopicsRequestData,
+    apiVersion: Int,
+    hasClusterAuth: Boolean,
+    getDescribableTopics: Iterable[String] => Set[String],
+    getDeletableTopics: Iterable[String] => Set[String]
+  ): CompletableFuture[util.List[DeletableTopicResult]] = {
     // Check if topic deletion is enabled at all.
     if (!config.deleteTopicEnable) {
       if (apiVersion < 3) {
@@ -186,7 +191,6 @@ class ControllerApis(val requestChannel: RequestChannel,
         throw new TopicDeletionDisabledException()
       }
     }
-    val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS);
     // The first step is to load up the names and IDs that have been provided by the
     // request.  This is a bit messy because we support multiple ways of referring to
     // topics (both by name and by id) and because we need to check for duplicates or
@@ -239,7 +243,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(deadlineNs, providedIds).thenCompose { topicNames =>
+    controller.findTopicNames(context, providedIds).thenCompose { topicNames =>
       topicNames.forEach { (id, nameOrError) =>
         if (nameOrError.isError) {
           appendResponse(null, id, nameOrError.error())
@@ -274,7 +278,7 @@ class ControllerApis(val requestChannel: RequestChannel,
       }
       // For each topic that was provided by name, check if authentication failed.
       // If so, create an error response for it. Otherwise, add it to the idToName map.
-      controller.findTopicIds(deadlineNs, providedNames).thenCompose { topicIds =>
+      controller.findTopicIds(context, providedNames).thenCompose { topicIds =>
         topicIds.forEach { (name, idOrError) =>
           if (!describeable.contains(name)) {
             appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
@@ -298,7 +302,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         }
         // Finally, the idToName map contains all the topics that we are authorized to delete.
         // Perform the deletion and create responses for each one.
-        controller.deleteTopics(deadlineNs, idToName.keySet).thenApply { idToError =>
+        controller.deleteTopics(context, idToName.keySet).thenApply { idToError =>
           idToError.forEach { (id, error) =>
             appendResponse(idToName.get(id), id, error)
           }
@@ -313,7 +317,10 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleCreateTopics(request: RequestChannel.Request): Unit = {
     val createTopicsRequest = request.body[CreateTopicsRequest]
-    val future = createTopics(createTopicsRequest.data(),
+    val context = new ControllerRequestContext(request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, createTopicsRequest.data().timeoutMs()))
+    val future = createTopics(context,
+        createTopicsRequest.data(),
         authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false),
         names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity),
         names => authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
@@ -331,6 +338,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
 
   def createTopics(
+    context: ControllerRequestContext,
     request: CreateTopicsRequestData,
     hasClusterAuth: Boolean,
     getCreatableTopics: Iterable[String] => Set[String],
@@ -361,7 +369,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         iterator.remove()
       }
     }
-    controller.createTopics(effectiveRequest, describableTopicNames).thenApply { response =>
+    controller.createTopics(context, effectiveRequest, describableTopicNames).thenApply { response =>
       duplicateTopicNames.forEach { name =>
         response.topics().add(new CreatableTopicResult().
           setName(name).
@@ -421,6 +429,7 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleLegacyAlterConfigs(request: RequestChannel.Request): Unit = {
     val response = new AlterConfigsResponseData()
     val alterConfigsRequest = request.body[AlterConfigsRequest]
+    val context = new ControllerRequestContext(request.context.principal, OptionalLong.empty())
     val duplicateResources = new util.HashSet[ConfigResource]
     val configChanges = new util.HashMap[ConfigResource, util.Map[String, String]]()
     alterConfigsRequest.data.resources.forEach { resource =>
@@ -459,7 +468,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         iterator.remove()
       }
     }
-    controller.legacyAlterConfigs(configChanges, alterConfigsRequest.data.validateOnly)
+    controller.legacyAlterConfigs(context, configChanges, alterConfigsRequest.data.validateOnly)
       .whenComplete { (controllerResults, exception) =>
         if (exception != null) {
           requestHelper.handleError(request, exception)
@@ -498,9 +507,10 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleElectLeaders(request: RequestChannel.Request): Unit = {
     authHelper.authorizeClusterOperation(request, ALTER)
-
     val electLeadersRequest = request.body[ElectLeadersRequest]
-    val future = controller.electLeaders(electLeadersRequest.data)
+    val context = new ControllerRequestContext(request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, electLeadersRequest.data().timeoutMs()))
+    val future = controller.electLeaders(context, electLeadersRequest.data)
     future.whenComplete { (responseData, exception) =>
       if (exception != null) {
         requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
@@ -516,8 +526,10 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleAlterPartitionRequest(request: RequestChannel.Request): Unit = {
     val alterPartitionRequest = request.body[AlterPartitionRequest]
+    val context = new ControllerRequestContext(request.context.principal,
+      OptionalLong.empty())
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-    val future = controller.alterPartition(alterPartitionRequest.data)
+    val future = controller.alterPartition(context, alterPartitionRequest.data)
     future.whenComplete { (result, exception) =>
       val response = if (exception != null) {
         alterPartitionRequest.getErrorResponse(exception)
@@ -531,8 +543,9 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleBrokerHeartBeatRequest(request: RequestChannel.Request): Unit = {
     val heartbeatRequest = request.body[BrokerHeartbeatRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-
-    controller.processBrokerHeartbeat(heartbeatRequest.data).handle[Unit] { (reply, e) =>
+    val context = new ControllerRequestContext(request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, config.brokerHeartbeatIntervalMs))
+    controller.processBrokerHeartbeat(context, heartbeatRequest.data).handle[Unit] { (reply, e) =>
       def createResponseCallback(requestThrottleMs: Int,
                                  reply: BrokerHeartbeatReply,
                                  e: Throwable): BrokerHeartbeatResponse = {
@@ -557,8 +570,10 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleUnregisterBroker(request: RequestChannel.Request): Unit = {
     val decommissionRequest = request.body[UnregisterBrokerRequest]
     authHelper.authorizeClusterOperation(request, ALTER)
+    val context = new ControllerRequestContext(request.context.principal,
+      OptionalLong.empty())
 
-    controller.unregisterBroker(decommissionRequest.data().brokerId()).handle[Unit] { (_, e) =>
+    controller.unregisterBroker(context, decommissionRequest.data().brokerId()).handle[Unit] { (_, e) =>
       def createResponseCallback(requestThrottleMs: Int,
                                  e: Throwable): UnregisterBrokerResponse = {
         if (e != null) {
@@ -578,8 +593,10 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleBrokerRegistration(request: RequestChannel.Request): Unit = {
     val registrationRequest = request.body[BrokerRegistrationRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+    val context = new ControllerRequestContext(request.context.principal,
+      OptionalLong.empty())
 
-    controller.registerBroker(registrationRequest.data).handle[Unit] { (reply, e) =>
+    controller.registerBroker(context, registrationRequest.data).handle[Unit] { (reply, e) =>
       def createResponseCallback(requestThrottleMs: Int,
                                  reply: BrokerRegistrationReply,
                                  e: Throwable): BrokerRegistrationResponse = {
@@ -617,7 +634,9 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleAlterClientQuotas(request: RequestChannel.Request): Unit = {
     val quotaRequest = request.body[AlterClientQuotasRequest]
     authHelper.authorizeClusterOperation(request, ALTER_CONFIGS)
-    controller.alterClientQuotas(quotaRequest.entries, quotaRequest.validateOnly)
+    val context = new ControllerRequestContext(request.context.principal,
+      OptionalLong.empty())
+    controller.alterClientQuotas(context, quotaRequest.entries, quotaRequest.validateOnly)
       .whenComplete { (results, exception) =>
         if (exception != null) {
           requestHelper.handleError(request, exception)
@@ -631,6 +650,8 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
     val response = new IncrementalAlterConfigsResponseData()
     val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
+    val context = new ControllerRequestContext(request.context.principal,
+      OptionalLong.empty())
     val duplicateResources = new util.HashSet[ConfigResource]
     val configChanges = new util.HashMap[ConfigResource,
       util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
@@ -673,7 +694,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         iterator.remove()
       }
     }
-    controller.incrementalAlterConfigs(configChanges, alterConfigsRequest.data.validateOnly)
+    controller.incrementalAlterConfigs(context, configChanges, alterConfigsRequest.data.validateOnly)
       .whenComplete { (controllerResults, exception) =>
         if (exception != null) {
           requestHelper.handleError(request, exception)
@@ -694,12 +715,12 @@ class ControllerApis(val requestChannel: RequestChannel,
     def filterAlterAuthorizedTopics(topics: Iterable[String]): Set[String] = {
       authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
     }
-
-    val future = createPartitions(
-      request.body[CreatePartitionsRequest].data,
-      filterAlterAuthorizedTopics
-    )
-
+    val createPartitionsRequest = request.body[CreatePartitionsRequest]
+    val context = new ControllerRequestContext(request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, createPartitionsRequest.data().timeoutMs()))
+    val future = createPartitions(context,
+      createPartitionsRequest.data(),
+      filterAlterAuthorizedTopics)
     future.whenComplete { (responses, exception) =>
       if (exception != null) {
         requestHelper.handleError(request, exception)
@@ -715,10 +736,10 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
 
   def createPartitions(
+    context: ControllerRequestContext,
     request: CreatePartitionsRequestData,
     getAlterAuthorizedTopics: Iterable[String] => Set[String]
   ): CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
-    val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS);
     val responses = new util.ArrayList[CreatePartitionsTopicResult]()
     val duplicateTopicNames = new util.HashSet[String]()
     val topicNames = new util.HashSet[String]()
@@ -746,7 +767,7 @@ class ControllerApis(val requestChannel: RequestChannel,
           setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
       }
     }
-    controller.createPartitions(deadlineNs, topics).thenApply { results =>
+    controller.createPartitions(context, topics).thenApply { results =>
       results.forEach(response => responses.add(response))
       responses
     }
@@ -755,7 +776,9 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleAlterPartitionReassignments(request: RequestChannel.Request): Unit = {
     val alterRequest = request.body[AlterPartitionReassignmentsRequest]
     authHelper.authorizeClusterOperation(request, ALTER)
-    val response = controller.alterPartitionReassignments(alterRequest.data()).get()
+    val context = new ControllerRequestContext(request.context.principal,
+      requestTimeoutMsToDeadlineNs(time, alterRequest.data().timeoutMs()))
+    val response = controller.alterPartitionReassignments(context, alterRequest.data()).get()
     requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
       new AlterPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
@@ -763,7 +786,9 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleListPartitionReassignments(request: RequestChannel.Request): Unit = {
     val listRequest = request.body[ListPartitionReassignmentsRequest]
     authHelper.authorizeClusterOperation(request, DESCRIBE)
-    val response = controller.listPartitionReassignments(listRequest.data()).get()
+    val context = new ControllerRequestContext(request.context.principal,
+      OptionalLong.empty())
+    val response = controller.listPartitionReassignments(context, listRequest.data()).get()
     requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
       new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
@@ -771,7 +796,9 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
     val allocatedProducerIdsRequest = request.body[AllocateProducerIdsRequest]
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-    controller.allocateProducerIds(allocatedProducerIdsRequest.data)
+    val context = new ControllerRequestContext(request.context.principal,
+        OptionalLong.empty())
+    controller.allocateProducerIds(context, allocatedProducerIdsRequest.data)
       .whenComplete((results, exception) => {
         if (exception != null) {
           requestHelper.handleError(request, exception)
@@ -787,7 +814,9 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
     val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
     authHelper.authorizeClusterOperation(request, ALTER)
-    controller.updateFeatures(updateFeaturesRequest.data)
+    val context = new ControllerRequestContext(request.context.principal,
+      OptionalLong.empty())
+    controller.updateFeatures(context, updateFeaturesRequest.data)
       .whenComplete((response, exception) => {
         if (exception != null) {
           requestHelper.handleError(request, exception)
diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 5cc075ef04..c3c77e1203 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -44,13 +44,56 @@ import scala.collection.mutable
  * as the others. It is not persisted to the metadata log (or to ZK, when we're in that mode).
  */
 class ControllerConfigurationValidator extends ConfigurationValidator {
-  override def validate(resource: ConfigResource, config: util.Map[String, String]): Unit = {
+  private def validateTopicName(
+    name: String
+  ): Unit = {
+    if (name.isEmpty()) {
+      throw new InvalidRequestException("Default topic resources are not allowed.")
+    }
+    Topic.validate(name)
+  }
+
+  private def validateBrokerName(
+    name: String
+  ): Unit = {
+    if (!name.isEmpty()) {
+      val brokerId = try {
+        Integer.valueOf(name)
+      } catch {
+        case _: NumberFormatException =>
+          throw new InvalidRequestException("Unable to parse broker name as a base 10 number.")
+      }
+      if (brokerId < 0) {
+        throw new InvalidRequestException("Invalid negative broker ID.")
+      }
+    }
+  }
+
+  private def throwExceptionForUnknownResourceType(
+    resource: ConfigResource
+  ): Unit = {
+    // Note: we should never handle BROKER_LOGGER resources here, since changes to
+    // those resources are not persisted in the metadata.
+    throw new InvalidRequestException(s"Unknown resource type ${resource.`type`}")
+  }
+
+  override def validate(
+    resource: ConfigResource
+  ): Unit = {
+    resource.`type`() match {
+      case TOPIC => validateTopicName(resource.name())
+      case BROKER => validateBrokerName(resource.name())
+      case _ => throwExceptionForUnknownResourceType(resource)
+    }
+  }
+
+  override def validate(
+    resource: ConfigResource,
+    config: util.Map[String, String]
+  ): Unit = {
     resource.`type`() match {
       case TOPIC =>
-        if (resource.name().isEmpty()) {
-          throw new InvalidRequestException("Default topic resources are not allowed.")
-        }
-        Topic.validate(resource.name())
+        validateTopicName(resource.name())
         val properties = new Properties()
         val nullTopicConfigs = new mutable.ArrayBuffer[String]()
         config.entrySet().forEach(e => {
@@ -65,22 +108,8 @@ class ControllerConfigurationValidator extends ConfigurationValidator {
             nullTopicConfigs.mkString(","))
         }
         LogConfig.validate(properties)
-      case BROKER =>
-        if (resource.name().nonEmpty) {
-          val brokerId = try {
-            Integer.valueOf(resource.name())
-          } catch {
-            case _: NumberFormatException =>
-              throw new InvalidRequestException("Unable to parse broker name as a base 10 number.")
-          }
-          if (brokerId < 0) {
-            throw new InvalidRequestException("Invalid negative broker ID.")
-          }
-        }
-      case _ =>
-        // Note: we should never handle BROKER_LOGGER resources here, since changes to
-        // those resources are not persisted in the metadata.
-        throw new InvalidRequestException(s"Unknown resource type ${resource.`type`}")
+      case BROKER => validateBrokerName(resource.name())
+      case _ => throwExceptionForUnknownResourceType(resource)
     }
   }
 }
\ No newline at end of file
diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java
index d3bda07361..f6bfd207d4 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.controller.Controller;
+import org.apache.kafka.controller.ControllerRequestContext;
 import org.apache.kafka.controller.ResultOrError;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
@@ -78,12 +79,18 @@ public class MockController implements Controller {
     private final AtomicLong nextTopicId = new AtomicLong(1);
 
     @Override
-    public CompletableFuture<List<AclCreateResult>> createAcls(List<AclBinding> aclBindings) {
+    public CompletableFuture<List<AclCreateResult>> createAcls(
+        ControllerRequestContext context,
+        List<AclBinding> aclBindings
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> aclBindingFilters) {
+    public CompletableFuture<List<AclDeleteResult>> deleteAcls(
+        ControllerRequestContext context,
+        List<AclBindingFilter> aclBindingFilters
+    ) {
         throw new UnsupportedOperationException();
     }
 
@@ -110,13 +117,19 @@ public class MockController implements Controller {
     }
 
     @Override
-    public CompletableFuture<AlterPartitionResponseData> alterPartition(AlterPartitionRequestData request) {
+    public CompletableFuture<AlterPartitionResponseData> alterPartition(
+        ControllerRequestContext context,
+        AlterPartitionRequestData request
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    synchronized public CompletableFuture<CreateTopicsResponseData>
-            createTopics(CreateTopicsRequestData request, Set<String> describable) {
+    synchronized public CompletableFuture<CreateTopicsResponseData> createTopics(
+        ControllerRequestContext context,
+        CreateTopicsRequestData request,
+        Set<String> describable
+    ) {
         CreateTopicsResponseData response = new CreateTopicsResponseData();
         for (CreatableTopic topic : request.topics()) {
             if (topicNameToId.containsKey(topic.name())) {
@@ -158,7 +171,10 @@ public class MockController implements Controller {
     }
 
     @Override
-    public CompletableFuture<Void> unregisterBroker(int brokerId) {
+    public CompletableFuture<Void> unregisterBroker(
+        ControllerRequestContext context,
+        int brokerId
+    ) {
         throw new UnsupportedOperationException();
     }
 
@@ -179,8 +195,10 @@ public class MockController implements Controller {
     private final Map<ConfigResource, Map<String, String>> configs = new HashMap<>();
 
     @Override
-    synchronized public CompletableFuture<Map<String, ResultOrError<Uuid>>>
-            findTopicIds(long deadlineNs, Collection<String> topicNames) {
+    synchronized public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(
+        ControllerRequestContext context,
+        Collection<String> topicNames
+    ) {
         Map<String, ResultOrError<Uuid>> results = new HashMap<>();
         for (String topicName : topicNames) {
             if (!topicNameToId.containsKey(topicName)) {
@@ -193,7 +211,9 @@ public class MockController implements Controller {
     }
 
     @Override
-    synchronized public CompletableFuture<Map<String, Uuid>> findAllTopicIds(long deadlineNs) {
+    synchronized public CompletableFuture<Map<String, Uuid>> findAllTopicIds(
+        ControllerRequestContext context
+    ) {
         Map<String, Uuid> results = new HashMap<>();
         for (Entry<Uuid, MockTopic> entry : topics.entrySet()) {
             results.put(entry.getValue().name, entry.getKey());
@@ -202,8 +222,10 @@ public class MockController implements Controller {
     }
 
     @Override
-    synchronized public CompletableFuture<Map<Uuid, ResultOrError<String>>>
-            findTopicNames(long deadlineNs, Collection<Uuid> topicIds) {
+    synchronized public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(
+        ControllerRequestContext context,
+        Collection<Uuid> topicIds
+    ) {
         Map<Uuid, ResultOrError<String>> results = new HashMap<>();
         for (Uuid topicId : topicIds) {
             MockTopic topic = topics.get(topicId);
@@ -217,8 +239,10 @@ public class MockController implements Controller {
     }
 
     @Override
-    synchronized public CompletableFuture<Map<Uuid, ApiError>>
-            deleteTopics(long deadlineNs, Collection<Uuid> topicIds) {
+    synchronized public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(
+        ControllerRequestContext context,
+        Collection<Uuid> topicIds
+    ) {
         if (!active) {
             CompletableFuture<Map<Uuid, ApiError>> future = new CompletableFuture<>();
             future.completeExceptionally(NOT_CONTROLLER_EXCEPTION);
@@ -238,24 +262,34 @@ public class MockController implements Controller {
     }
 
     @Override
-    public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(Map<ConfigResource, Collection<String>> resources) {
+    public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(
+        ControllerRequestContext context,
+        Map<ConfigResource, Collection<String>> resources
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
+    public CompletableFuture<ElectLeadersResponseData> electLeaders(
+        ControllerRequestContext context,
+        ElectLeadersRequestData request
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures() {
+    public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures(
+        ControllerRequestContext context
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
-            Map<ConfigResource, Map<String, Entry<AlterConfigOp.OpType, String>>> configChanges,
-            boolean validateOnly) {
+        ControllerRequestContext context,
+        Map<ConfigResource, Map<String, Entry<AlterConfigOp.OpType, String>>> configChanges,
+        boolean validateOnly
+    ) {
         Map<ConfigResource, ApiError> results = new HashMap<>();
         for (Entry<ConfigResource, Map<String, Entry<AlterConfigOp.OpType, String>>> entry :
                 configChanges.entrySet()) {
@@ -295,20 +329,27 @@ public class MockController implements Controller {
     }
 
     @Override
-    public CompletableFuture<AlterPartitionReassignmentsResponseData>
-            alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
+    public CompletableFuture<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(
+        ControllerRequestContext context,
+        AlterPartitionReassignmentsRequestData request
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<ListPartitionReassignmentsResponseData>
-            listPartitionReassignments(ListPartitionReassignmentsRequestData request) {
+    public CompletableFuture<ListPartitionReassignmentsResponseData> listPartitionReassignments(
+        ControllerRequestContext context,
+        ListPartitionReassignmentsRequestData request
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
-            Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
+        ControllerRequestContext context,
+        Map<ConfigResource, Map<String, String>> newConfigs,
+        boolean validateOnly
+    ) {
         Map<ConfigResource, ApiError> results = new HashMap<>();
         if (!validateOnly) {
             for (Entry<ConfigResource, Map<String, String>> entry : newConfigs.entrySet()) {
@@ -324,14 +365,18 @@ public class MockController implements Controller {
     }
 
     @Override
-    public CompletableFuture<BrokerHeartbeatReply>
-            processBrokerHeartbeat(BrokerHeartbeatRequestData request) {
+    public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(
+        ControllerRequestContext context,
+        BrokerHeartbeatRequestData request
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<BrokerRegistrationReply>
-            registerBroker(BrokerRegistrationRequestData request) {
+    public CompletableFuture<BrokerRegistrationReply> registerBroker(
+        ControllerRequestContext context,
+        BrokerRegistrationRequestData request
+    ) {
         throw new UnsupportedOperationException();
     }
 
@@ -341,24 +386,35 @@ public class MockController implements Controller {
     }
 
     @Override
-    public CompletableFuture<Map<ClientQuotaEntity, ApiError>>
-            alterClientQuotas(Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly) {
+    public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
+        ControllerRequestContext context,
+        Collection<ClientQuotaAlteration> quotaAlterations,
+        boolean validateOnly
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(AllocateProducerIdsRequestData request) {
+    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
+        ControllerRequestContext context,
+        AllocateProducerIdsRequestData request
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(UpdateFeaturesRequestData request) {
+    public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
+        ControllerRequestContext context,
+        UpdateFeaturesRequestData request
+    ) {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
-            createPartitions(long deadlineNs, List<CreatePartitionsTopic> topicList) {
+    synchronized public CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
+        ControllerRequestContext context,
+        List<CreatePartitionsTopic> topicList
+    ) {
         if (!active) {
             CompletableFuture<List<CreatePartitionsTopicResult>> future = new CompletableFuture<>();
             future.completeExceptionally(NOT_CONTROLLER_EXCEPTION);
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ef1b8d96fb..76b5660586 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -2510,13 +2510,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, null, AclOperation.ANY, AclPermissionType.ANY)
     val aclFilter = new AclBindingFilter(ResourcePatternFilter.ANY, aclEntryFilter)
 
-    authorizerForWrite.deleteAcls(null, List(aclFilter).asJava).asScala.map(_.toCompletableFuture.get).flatMap { deletion =>
-      deletion.aclBindingDeleteResults().asScala.map(_.aclBinding.pattern).toSet
-    }.foreach { resource =>
-      (brokers.map(_.authorizer.get) ++ controllerServers.map(_.authorizer.get)).foreach { authorizer =>
-        TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer, resource, aclEntryFilter)
+    authorizerForWrite.deleteAcls(TestUtils.anonymousAuthorizableContext, List(aclFilter).asJava).asScala.
+      map(_.toCompletableFuture.get).flatMap { deletion =>
+        deletion.aclBindingDeleteResults().asScala.map(_.aclBinding.pattern).toSet
+      }.foreach { resource =>
+        (brokers.map(_.authorizer.get) ++ controllerServers.map(_.authorizer.get)).foreach { authorizer =>
+          TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer, resource, aclEntryFilter)
+        }
       }
-    }
   }
 
   private def sendRequestAndVerifyResponseError(request: AbstractRequest,
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index b502d4863f..dda1ac3347 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.controller.ControllerRequestContext.ANONYMOUS_CONTEXT
 
 /**
  * A test harness that brings up some number of broker nodes
@@ -289,7 +290,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
   def getTopicIds(names: Seq[String]): Map[String, Uuid] = {
     val result = new util.HashMap[String, Uuid]()
     if (isKRaftTest()) {
-      val topicIdsMap = controllerServer.controller.findTopicIds(Long.MaxValue, names.asJava).get()
+      val topicIdsMap = controllerServer.controller.findTopicIds(ANONYMOUS_CONTEXT, names.asJava).get()
       names.foreach { name =>
         val response = topicIdsMap.get(name)
         result.put(name, response.result())
@@ -305,7 +306,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
 
   def getTopicIds(): Map[String, Uuid] = {
     if (isKRaftTest()) {
-      controllerServer.controller.findAllTopicIds(Long.MaxValue).get().asScala.toMap
+      controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().asScala.toMap
     } else {
       getController().kafkaController.controllerContext.topicIds.toMap
     }
@@ -314,7 +315,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
   def getTopicNames(): Map[Uuid, String] = {
     if (isKRaftTest()) {
       val result = new util.HashMap[Uuid, String]()
-      controllerServer.controller.findAllTopicIds(Long.MaxValue).get().entrySet().forEach {
+      controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach {
         e => result.put(e.getValue(), e.getKey())
       }
       result.asScala.toMap
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 5fcf763d7f..d28ee38db6 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -53,7 +53,8 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.{ElectionType, Uuid}
-import org.apache.kafka.controller.Controller
+import org.apache.kafka.controller.{Controller, ControllerRequestContext}
+import org.apache.kafka.controller.ControllerRequestContext.ANONYMOUS_CONTEXT
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.junit.jupiter.api.Assertions._
@@ -517,7 +518,7 @@ class ControllerApisTest {
         setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()),
       new CreatableTopicResult().setName("quux").
         setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
-    assertEquals(expectedResponse, controllerApis.createTopics(request,
+    assertEquals(expectedResponse, controllerApis.createTopics(ANONYMOUS_CONTEXT, request,
       false,
       _ => Set("baz", "indescribable"),
       _ => Set("baz")).get().topics().asScala.toSet)
@@ -537,7 +538,7 @@ class ControllerApisTest {
         setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
         setErrorMessage("This server does not host this topic-partition."),
       new DeletableTopicResult().setName("foo").setTopicId(fooId))
-    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+    assertEquals(expectedResponse, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
       ApiKeys.DELETE_TOPICS.latestVersion().toInt,
       true,
       _ => Set.empty,
@@ -563,7 +564,7 @@ class ControllerApisTest {
         setErrorCode(UNKNOWN_TOPIC_ID.code()).
         setErrorMessage("This server does not host this topic ID."),
       new DeletableTopicResult().setName("foo").setTopicId(fooId))
-    assertEquals(response, controllerApis.deleteTopics(request,
+    assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
       ApiKeys.DELETE_TOPICS.latestVersion().toInt,
       true,
       _ => Set.empty,
@@ -605,7 +606,7 @@ class ControllerApisTest {
       new DeletableTopicResult().setName(null).setTopicId(bazId).
         setErrorCode(INVALID_REQUEST.code()).
         setErrorMessage("Duplicate topic id."))
-    assertEquals(response, controllerApis.deleteTopics(request,
+    assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
       ApiKeys.DELETE_TOPICS.latestVersion().toInt,
       false,
       names => names.toSet,
@@ -641,7 +642,7 @@ class ControllerApisTest {
       new DeletableTopicResult().setName("foo").setTopicId(fooId).
         setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
         setErrorMessage(TOPIC_AUTHORIZATION_FAILED.message))
-    assertEquals(response, controllerApis.deleteTopics(request,
+    assertEquals(response, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
       ApiKeys.DELETE_TOPICS.latestVersion().toInt,
       false,
       _ => Set("foo", "baz"),
@@ -666,7 +667,7 @@ class ControllerApisTest {
       new DeletableTopicResult().setName(null).setTopicId(barId).
         setErrorCode(UNKNOWN_TOPIC_ID.code).
         setErrorMessage(UNKNOWN_TOPIC_ID.message))
-    assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+    assertEquals(expectedResponse, controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
       ApiKeys.DELETE_TOPICS.latestVersion().toInt,
       false,
       _ => Set("foo"),
@@ -685,7 +686,7 @@ class ControllerApisTest {
     request.topics().add(new DeleteTopicState().setName(null).setTopicId(fooId))
     request.topics().add(new DeleteTopicState().setName(null).setTopicId(barId))
     assertEquals(classOf[NotControllerException], assertThrows(
-      classOf[ExecutionException], () => controllerApis.deleteTopics(request,
+      classOf[ExecutionException], () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
         ApiKeys.DELETE_TOPICS.latestVersion().toInt,
         false,
         _ => Set("foo", "bar"),
@@ -702,12 +703,14 @@ class ControllerApisTest {
     val controllerApis = createControllerApis(None, controller, props)
     val request = new DeleteTopicsRequestData()
     request.topics().add(new DeleteTopicState().setName("foo").setTopicId(ZERO_UUID))
-    assertThrows(classOf[TopicDeletionDisabledException], () => controllerApis.deleteTopics(request,
+    assertThrows(classOf[TopicDeletionDisabledException],
+      () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
         ApiKeys.DELETE_TOPICS.latestVersion().toInt,
         false,
         _ => Set("foo", "bar"),
         _ => Set("foo", "bar")))
-    assertThrows(classOf[InvalidRequestException], () => controllerApis.deleteTopics(request,
+    assertThrows(classOf[InvalidRequestException],
+      () => controllerApis.deleteTopics(ANONYMOUS_CONTEXT, request,
         1,
         false,
         _ => Set("foo", "bar"),
@@ -735,7 +738,8 @@ class ControllerApisTest {
       new CreatePartitionsTopicResult().setName("baz").
         setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
         setErrorMessage(null)),
-      controllerApis.createPartitions(request, _ => Set("foo", "bar")).get().asScala.toSet)
+      controllerApis.createPartitions(ANONYMOUS_CONTEXT, request,
+        _ => Set("foo", "bar")).get().asScala.toSet)
   }
 
   @Test
@@ -814,8 +818,8 @@ class ControllerApisTest {
     val responseData = new ElectLeadersResponseData()
         .setErrorCode(Errors.NOT_CONTROLLER.code)
 
-    when(controller.electLeaders(
-      request.data
+    when(controller.electLeaders(any[ControllerRequestContext],
+      ArgumentMatchers.eq(request.data)
     )).thenReturn(CompletableFuture.completedFuture(responseData))
 
     val response = handleRequest[ElectLeadersResponse](request, controllerApis)
diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index bece00354c..8e0ac959af 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -28,9 +28,10 @@ import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidReq
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 
 class ControllerConfigurationValidatorTest {
+  val validator = new ControllerConfigurationValidator()
+
   @Test
   def testDefaultTopicResourceIsRejected(): Unit = {
-    val validator = new ControllerConfigurationValidator()
     assertEquals("Default topic resources are not allowed.",
         assertThrows(classOf[InvalidRequestException], () => validator.validate(
         new ConfigResource(TOPIC, ""), emptyMap())). getMessage())
@@ -38,7 +39,6 @@ class ControllerConfigurationValidatorTest {
 
   @Test
   def testInvalidTopicNameRejected(): Unit = {
-    val validator = new ControllerConfigurationValidator()
     assertEquals("Topic name \"(<-invalid->)\" is illegal, it contains a character " +
       "other than ASCII alphanumerics, '.', '_' and '-'",
         assertThrows(classOf[InvalidTopicException], () => validator.validate(
@@ -47,7 +47,6 @@ class ControllerConfigurationValidatorTest {
 
   @Test
   def testUnknownResourceType(): Unit = {
-    val validator = new ControllerConfigurationValidator()
     assertEquals("Unknown resource type BROKER_LOGGER",
       assertThrows(classOf[InvalidRequestException], () => validator.validate(
         new ConfigResource(BROKER_LOGGER, "foo"), emptyMap())). getMessage())
@@ -55,7 +54,6 @@ class ControllerConfigurationValidatorTest {
 
   @Test
   def testNullTopicConfigValue(): Unit = {
-    val validator = new ControllerConfigurationValidator()
     val config = new TreeMap[String, String]()
     config.put(SEGMENT_JITTER_MS_CONFIG, "10")
     config.put(SEGMENT_BYTES_CONFIG, null)
@@ -67,7 +65,6 @@ class ControllerConfigurationValidatorTest {
 
   @Test
   def testValidTopicConfig(): Unit = {
-    val validator = new ControllerConfigurationValidator()
     val config = new TreeMap[String, String]()
     config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
     config.put(SEGMENT_BYTES_CONFIG, "67108864")
@@ -76,7 +73,6 @@ class ControllerConfigurationValidatorTest {
 
   @Test
   def testInvalidTopicConfig(): Unit = {
-    val validator = new ControllerConfigurationValidator()
     val config = new TreeMap[String, String]()
     config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
     config.put(SEGMENT_BYTES_CONFIG, "67108864")
@@ -88,7 +84,6 @@ class ControllerConfigurationValidatorTest {
 
   @Test
   def testInvalidBrokerEntity(): Unit = {
-    val validator = new ControllerConfigurationValidator()
     val config = new TreeMap[String, String]()
     config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
     assertEquals("Unable to parse broker name as a base 10 number.",
@@ -98,7 +93,6 @@ class ControllerConfigurationValidatorTest {
 
   @Test
   def testInvalidNegativeBrokerId(): Unit = {
-    val validator = new ControllerConfigurationValidator()
     val config = new TreeMap[String, String]()
     config.put(SEGMENT_JITTER_MS_CONFIG, "1000")
     assertEquals("Invalid negative broker ID.",
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 09491bf1e1..496f4dbcdf 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -80,7 +80,13 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
         val resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic())
         val op = new AlterConfigOp(new ConfigEntry(FlushMessagesProp, newVal.toString()),
           SET)
-        admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all.get
+        val resource2 = new ConfigResource(ConfigResource.Type.BROKER, "")
+        val op2 = new AlterConfigOp(new ConfigEntry(KafkaConfig.LogFlushIntervalMsProp, newVal.toString()),
+          SET)
+        admin.incrementalAlterConfigs(Map(
+          resource -> List(op).asJavaCollection,
+          resource2 -> List(op2).asJavaCollection,
+        ).asJava).all.get
       } finally {
         admin.close()
       }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index deb6a115d2..fb0cfa69c7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -68,7 +68,7 @@ import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaFuture, TopicPartition}
 import org.apache.kafka.controller.QuorumController
-import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
+import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
 import org.apache.zookeeper.KeeperException.SessionExpiredException
@@ -2071,6 +2071,17 @@ object TestUtils extends Logging {
     }
   }
 
+  val anonymousAuthorizableContext = new AuthorizableRequestContext() {
+    override def listenerName(): String = ""
+    override def securityProtocol(): SecurityProtocol = SecurityProtocol.PLAINTEXT
+    override def principal(): KafkaPrincipal = KafkaPrincipal.ANONYMOUS
+    override def clientAddress(): InetAddress = null
+    override def requestType(): Int = 0
+    override def requestVersion(): Int = 0
+    override def clientId(): String = ""
+    override def correlationId(): Int = 0
+  }
+
   def addAndVerifyAcls[B <: KafkaBroker](
     brokers: Seq[B],
     acls: Set[AccessControlEntry],
@@ -2079,7 +2090,7 @@ object TestUtils extends Logging {
   ): Unit = {
     val authorizerForWrite = pickAuthorizerForWrite(brokers, controllers)
     val aclBindings = acls.map { acl => new AclBinding(resource, acl) }
-    authorizerForWrite.createAcls(null, aclBindings.toList.asJava).asScala
+    authorizerForWrite.createAcls(anonymousAuthorizableContext, aclBindings.toList.asJava).asScala
       .map(_.toCompletableFuture.get)
       .foreach { result =>
         result.exception.ifPresent { e => throw e }
@@ -2100,7 +2111,7 @@ object TestUtils extends Logging {
   ): Unit = {
     val authorizerForWrite = pickAuthorizerForWrite(brokers, controllers)
     val aclBindingFilters = acls.map { acl => new AclBindingFilter(resource.toFilter, acl.toFilter) }
-    authorizerForWrite.deleteAcls(null, aclBindingFilters.toList.asJava).asScala
+    authorizerForWrite.deleteAcls(anonymousAuthorizableContext, aclBindingFilters.toList.asJava).asScala
       .map(_.toCompletableFuture.get)
       .foreach { result =>
         result.exception.ifPresent { e => throw e }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index b95f0d327f..2e71b76fcb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -17,18 +17,16 @@
 
 package org.apache.kafka.controller;
 
-import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.metadata.UsableBroker;
+import org.apache.kafka.metadata.placement.UsableBroker;
 import org.slf4j.Logger;
 
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.TreeSet;
@@ -437,27 +435,11 @@ public class BrokerHeartbeatManager {
         return Optional.empty();
     }
 
-    /**
-     * Place replicas on unfenced brokers.
-     *
-     * @param startPartition    The partition ID to start with.
-     * @param numPartitions     The number of partitions to place.
-     * @param numReplicas       The number of replicas for each partition.
-     * @param idToRack          A function mapping broker id to broker rack.
-     * @param placer            The replica placer to use.
-     *
-     * @return                  A list of replica lists.
-     *
-     * @throws InvalidReplicationFactorException    If too many replicas were requested.
-     */
-    List<List<Integer>> placeReplicas(int startPartition,
-                                      int numPartitions,
-                                      short numReplicas,
-                                      Function<Integer, Optional<String>> idToRack,
-                                      ReplicaPlacer placer) {
-        Iterator<UsableBroker> iterator = new UsableBrokerIterator(
-            brokers.values().iterator(), idToRack);
-        return placer.place(startPartition, numPartitions, numReplicas, iterator);
+    Iterator<UsableBroker> usableBrokers(
+        Function<Integer, Optional<String>> idToRack
+    ) {
+        return new UsableBrokerIterator(brokers.values().iterator(),
+            idToRack);
     }
 
     static class UsableBrokerIterator implements Iterator<UsableBroker> {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 29b41c797b..add0a53a76 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.controller;
 
 import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
 import org.apache.kafka.common.errors.InconsistentClusterIdException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
@@ -38,6 +39,9 @@ import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.metadata.placement.ReplicaPlacer;
+import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
+import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
@@ -51,10 +55,13 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Optional;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;
 
 
@@ -64,6 +71,76 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKE
  * brokers being fenced or unfenced, and broker feature versions.
  */
 public class ClusterControlManager {
+    final static long DEFAULT_SESSION_TIMEOUT_NS = NANOSECONDS.convert(18, TimeUnit.SECONDS);
+
+    static class Builder {
+        private LogContext logContext = null;
+        private String clusterId = null;
+        private Time time = Time.SYSTEM;
+        private SnapshotRegistry snapshotRegistry = null;
+        private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
+        private ReplicaPlacer replicaPlacer = null;
+        private ControllerMetrics controllerMetrics = null;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setClusterId(String clusterId) {
+            this.clusterId = clusterId;
+            return this;
+        }
+
+        Builder setTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder setSessionTimeoutNs(long sessionTimeoutNs) {
+            this.sessionTimeoutNs = sessionTimeoutNs;
+            return this;
+        }
+
+        Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
+            this.replicaPlacer = replicaPlacer;
+            return this;
+        }
+
+        Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
+            this.controllerMetrics = controllerMetrics;
+            return this;
+        }
+
+        ClusterControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (clusterId == null) {
+                clusterId = Uuid.randomUuid().toString();
+            }
+            if (snapshotRegistry == null) {
+                snapshotRegistry = new SnapshotRegistry(logContext);
+            }
+            if (replicaPlacer == null) {
+                replicaPlacer = new StripedReplicaPlacer(new Random());
+            }
+            if (controllerMetrics == null) {
+                throw new RuntimeException("You must specify controllerMetrics");
+            }
+            return new ClusterControlManager(logContext,
+                clusterId,
+                time,
+                snapshotRegistry,
+                sessionTimeoutNs,
+                replicaPlacer,
+                controllerMetrics);
+        }
+    }
+
     class ReadyBrokersFuture {
         private final CompletableFuture<Void> future;
         private final int minBrokers;
@@ -138,13 +215,15 @@ public class ClusterControlManager {
      */
     private Optional<ReadyBrokersFuture> readyBrokersFuture;
 
-    ClusterControlManager(LogContext logContext,
-                          String clusterId,
-                          Time time,
-                          SnapshotRegistry snapshotRegistry,
-                          long sessionTimeoutNs,
-                          ReplicaPlacer replicaPlacer,
-                          ControllerMetrics metrics) {
+    private ClusterControlManager(
+        LogContext logContext,
+        String clusterId,
+        Time time,
+        SnapshotRegistry snapshotRegistry,
+        long sessionTimeoutNs,
+        ReplicaPlacer replicaPlacer,
+        ControllerMetrics metrics
+    ) {
         this.logContext = logContext;
         this.clusterId = clusterId;
         this.log = logContext.logger(ClusterControlManager.class);
@@ -157,6 +236,10 @@ public class ClusterControlManager {
         this.controllerMetrics = metrics;
     }
 
+    ReplicaPlacer replicaPlacer() {
+        return replicaPlacer;
+    }
+
     /**
      * Transition this ClusterControlManager to active.
      */
@@ -370,15 +453,12 @@ public class ClusterControlManager {
         }
     }
 
-
-    public List<List<Integer>> placeReplicas(int startPartition,
-                                             int numPartitions,
-                                             short numReplicas) {
+    Iterator<UsableBroker> usableBrokers() {
         if (heartbeatManager == null) {
             throw new RuntimeException("ClusterControlManager is not active.");
         }
-        return heartbeatManager.placeReplicas(startPartition, numPartitions, numReplicas,
-            id -> brokerRegistrations.get(id).rack(), replicaPlacer);
+        return heartbeatManager.usableBrokers(
+            id -> brokerRegistrations.get(id).rack());
     }
 
     public boolean unfenced(int brokerId) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 9c04e26656..558e55b902 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -148,6 +148,10 @@ public class ConfigurationControlManager {
         this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId));
     }
 
+    SnapshotRegistry snapshotRegistry() {
+        return snapshotRegistry;
+    }
+
     /**
      * Determine the result of applying a batch of incremental configuration changes.  Note
      * that this method does not change the contents of memory.  It just generates a
@@ -390,7 +394,7 @@ public class ConfigurationControlManager {
         for (Entry<ConfigResource, Collection<String>> resourceEntry : resources.entrySet()) {
             ConfigResource resource = resourceEntry.getKey();
             try {
-                validator.validate(resource, Collections.emptyMap());
+                validator.validate(resource);
             } catch (Throwable e) {
                 results.put(resource, new ResultOrError<>(ApiError.fromThrowable(e)));
                 continue;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
index b14580a4b7..7e8f505f40 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationValidator.java
@@ -23,7 +23,20 @@ import java.util.Map;
 
 
 public interface ConfigurationValidator {
-    ConfigurationValidator NO_OP = (__, ___) -> { };
+    ConfigurationValidator NO_OP = new ConfigurationValidator() {
+        @Override
+        public void validate(ConfigResource resource) { }
+
+        @Override
+        public void validate(ConfigResource resource, Map<String, String> config) { }
+    };
+
+    /**
+     * Throws an ApiException if the given resource is invalid to describe.
+     *
+     * @param resource      The configuration resource.
+     */
+    void validate(ConfigResource resource);
 
     /**
      * Throws an ApiException if a configuration is invalid for the given resource.
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 6f99765424..6da8fb5eda 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -57,167 +57,213 @@ public interface Controller extends AclMutator, AutoCloseable {
     /**
      * Change partition information.
      *
+     * @param context       The controller request context.
      * @param request       The AlterPartitionRequest data.
      *
      * @return              A future yielding the response.
      */
-    CompletableFuture<AlterPartitionResponseData> alterPartition(AlterPartitionRequestData request);
+    CompletableFuture<AlterPartitionResponseData> alterPartition(
+        ControllerRequestContext context,
+        AlterPartitionRequestData request
+    );
 
     /**
      * Create a batch of topics.
      *
+     * @param context       The controller request context.
      * @param request       The CreateTopicsRequest data.
      * @param describable   The topics which we have DESCRIBE permission on.
      *
      * @return              A future yielding the response.
      */
-    CompletableFuture<CreateTopicsResponseData>
-        createTopics(CreateTopicsRequestData request, Set<String> describable);
+    CompletableFuture<CreateTopicsResponseData> createTopics(
+        ControllerRequestContext context,
+        CreateTopicsRequestData request,
+        Set<String> describable
+    );
 
     /**
      * Unregister a broker.
      *
+     * @param context       The controller request context.
      * @param brokerId      The broker id to unregister.
      *
      * @return              A future that is completed successfully when the broker is
      *                      unregistered.
      */
-    CompletableFuture<Void> unregisterBroker(int brokerId);
+    CompletableFuture<Void> unregisterBroker(
+        ControllerRequestContext context,
+        int brokerId
+    );
 
     /**
      * Find the ids for topic names.
      *
-     * @param deadlineNs    The time by which this operation needs to be complete, before
-     *                      we will complete this operation with a timeout.
+     * @param context       The controller request context.
      * @param topicNames    The topic names to resolve.
      * @return              A future yielding a map from topic name to id.
      */
-    CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(long deadlineNs,
-                                                                     Collection<String> topicNames);
+    CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(
+        ControllerRequestContext context,
+        Collection<String> topicNames
+    );
 
     /**
      * Find the ids for all topic names. Note that this function should only be used for
      * integration tests.
      *
-     * @param deadlineNs    The time by which this operation needs to be complete, before
-     *                      we will complete this operation with a timeout.
+     * @param context       The controller request context.
      * @return              A future yielding a map from topic name to id.
      */
-    CompletableFuture<Map<String, Uuid>> findAllTopicIds(long deadlineNs);
+    CompletableFuture<Map<String, Uuid>> findAllTopicIds(
+        ControllerRequestContext context
+    );
 
     /**
      * Find the names for topic ids.
      *
-     * @param deadlineNs    The time by which this operation needs to be complete, before
-     *                      we will complete this operation with a timeout.
+     * @param context       The controller request context.
      * @param topicIds      The topic ids to resolve.
      * @return              A future yielding a map from topic id to name.
      */
-    CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(long deadlineNs,
-                                                                       Collection<Uuid> topicIds);
+    CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(
+        ControllerRequestContext context,
+        Collection<Uuid> topicIds
+    );
 
     /**
      * Delete a batch of topics.
      *
-     * @param deadlineNs    The time by which this operation needs to be complete, before
-     *                      we will complete this operation with a timeout.
+     * @param context       The controller request context.
      * @param topicIds      The IDs of the topics to delete.
      *
      * @return              A future yielding the response.
      */
-    CompletableFuture<Map<Uuid, ApiError>> deleteTopics(long deadlineNs,
-                                                        Collection<Uuid> topicIds);
+    CompletableFuture<Map<Uuid, ApiError>> deleteTopics(
+        ControllerRequestContext context,
+        Collection<Uuid> topicIds
+    );
 
     /**
      * Describe the current configuration of various resources.
      *
+     * @param context       The controller request context.
      * @param resources     A map from resources to the collection of config keys that we
      *                      want to describe for each.  If the collection is empty, then
      *                      all configuration keys will be described.
      *
-     * @return
+     * @return              A future yielding a map from config resources to results.
      */
-    CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>>
-        describeConfigs(Map<ConfigResource, Collection<String>> resources);
+    CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(
+        ControllerRequestContext context,
+        Map<ConfigResource, Collection<String>> resources
+    );
 
     /**
      * Elect new partition leaders.
      *
+     * @param context       The controller request context.
      * @param request       The request.
      *
      * @return              A future yielding the elect leaders response.
      */
-    CompletableFuture<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request);
+    CompletableFuture<ElectLeadersResponseData> electLeaders(
+        ControllerRequestContext context,
+        ElectLeadersRequestData request
+    );
 
     /**
      * Get the current finalized feature ranges for each feature.
      *
+     * @param context       The controller request context.
+     *
      * @return              A future yielding the feature ranges.
      */
-    CompletableFuture<FinalizedControllerFeatures> finalizedFeatures();
+    CompletableFuture<FinalizedControllerFeatures> finalizedFeatures(
+        ControllerRequestContext context
+    );
 
     /**
      * Perform some incremental configuration changes.
      *
+     * @param context       The controller request context.
      * @param configChanges The changes.
      * @param validateOnly  True if we should validate the changes but not apply them.
      *
      * @return              A future yielding a map from config resources to error results.
      */
     CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+        ControllerRequestContext context,
         Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges,
-        boolean validateOnly);
+        boolean validateOnly
+    );
 
     /**
      * Start or stop some partition reassignments.
      *
+     * @param context       The controller request context.
      * @param request       The alter partition reassignments request.
      *
      * @return              A future yielding the results.
      */
-    CompletableFuture<AlterPartitionReassignmentsResponseData>
-        alterPartitionReassignments(AlterPartitionReassignmentsRequestData request);
+    CompletableFuture<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(
+        ControllerRequestContext context,
+        AlterPartitionReassignmentsRequestData request
+    );
 
     /**
      * List ongoing partition reassignments.
      *
+     * @param context       The controller request context.
      * @param request       The list partition reassignments request.
      *
      * @return              A future yielding the results.
      */
-    CompletableFuture<ListPartitionReassignmentsResponseData>
-        listPartitionReassignments(ListPartitionReassignmentsRequestData request);
+    CompletableFuture<ListPartitionReassignmentsResponseData> listPartitionReassignments(
+        ControllerRequestContext context,
+        ListPartitionReassignmentsRequestData request
+    );
 
     /**
      * Perform some configuration changes using the legacy API.
      *
+     * @param context       The controller request context.
      * @param newConfigs    The new configuration maps to apply.
      * @param validateOnly  True if we should validate the changes but not apply them.
      *
      * @return              A future yielding a map from config resources to error results.
      */
     CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
-        Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly);
+        ControllerRequestContext context,
+        Map<ConfigResource, Map<String, String>> newConfigs,
+        boolean validateOnly
+    );
 
     /**
      * Process a heartbeat from a broker.
      *
+     * @param context       The controller request context.
      * @param request      The broker heartbeat request.
      *
      * @return             A future yielding the broker heartbeat reply.
      */
     CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(
-        BrokerHeartbeatRequestData request);
+        ControllerRequestContext context,
+        BrokerHeartbeatRequestData request
+    );
 
     /**
      * Attempt to register the given broker.
      *
+     * @param context       The controller request context.
      * @param request      The registration request.
      *
      * @return             A future yielding the broker registration reply.
      */
     CompletableFuture<BrokerRegistrationReply> registerBroker(
-        BrokerRegistrationRequestData request);
+        ControllerRequestContext context,
+        BrokerRegistrationRequestData request
+    );
 
     /**
      * Wait for the given number of brokers to be registered and unfenced.
@@ -232,29 +278,41 @@ public interface Controller extends AclMutator, AutoCloseable {
     /**
      * Perform some client quota changes
      *
-     * @param quotaAlterations The list of quotas to alter
-     * @param validateOnly     True if we should validate the changes but not apply them.
-     * @return                 A future yielding a map of quota entities to error results.
+     * @param context           The controller request context.
+     * @param quotaAlterations  The list of quotas to alter
+     * @param validateOnly      True if we should validate the changes but not apply them.
+     *
+     * @return                  A future yielding a map of quota entities to error results.
      */
     CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
-        Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly
+        ControllerRequestContext context,
+        Collection<ClientQuotaAlteration> quotaAlterations,
+        boolean validateOnly
     );
 
     /**
      * Allocate a block of producer IDs for transactional and idempotent producers
+     *
+     * @param context   The controller request context.
      * @param request   The allocate producer IDs request
+     *
      * @return          A future which yields a new producer ID block as a response
      */
     CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
+        ControllerRequestContext context,
         AllocateProducerIdsRequestData request
     );
 
     /**
      * Update a set of feature flags
+     *
+     * @param context   The controller request context.
      * @param request   The update features request
+     *
      * @return          A future which yields the result of the action
      */
     CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
+        ControllerRequestContext context,
         UpdateFeaturesRequestData request
     );
 
@@ -269,13 +327,13 @@ public interface Controller extends AclMutator, AutoCloseable {
     /**
      * Create partitions on certain topics.
      *
-     * @param deadlineNs    The time by which this operation needs to be complete, before
-     *                      we will complete this operation with a timeout.
      * @param topics        The list of topics to create partitions for.
      * @return              A future yielding per-topic results.
      */
-    CompletableFuture<List<CreatePartitionsTopicResult>>
-            createPartitions(long deadlineNs, List<CreatePartitionsTopic> topics);
+    CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
+        ControllerRequestContext context,
+        List<CreatePartitionsTopic> topics
+    );
 
     /**
      * Begin shutting down, but don't block.  You must still call close to clean up all
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
new file mode 100644
index 0000000000..e18d68c587
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerRequestContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.OptionalLong;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+
+public class ControllerRequestContext {
+    public static final ControllerRequestContext ANONYMOUS_CONTEXT =
+        new ControllerRequestContext(KafkaPrincipal.ANONYMOUS,
+            OptionalLong.empty());
+
+    public static OptionalLong requestTimeoutMsToDeadlineNs(
+        Time time,
+        int millisecondsOffset
+    ) {
+        return OptionalLong.of(time.nanoseconds() + NANOSECONDS.convert(millisecondsOffset, MILLISECONDS));
+    }
+
+    private final KafkaPrincipal principal;
+
+    private final OptionalLong deadlineNs;
+
+    public ControllerRequestContext(
+        KafkaPrincipal principal,
+        OptionalLong deadlineNs
+    ) {
+        this.principal = principal;
+        this.deadlineNs = deadlineNs;
+    }
+
+    public KafkaPrincipal principal() {
+        return principal;
+    }
+
+    public OptionalLong deadlineNs() {
+        return deadlineNs;
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index e42799fd1f..ff8ff78520 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -72,6 +72,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.controller.SnapshotGenerator.Section;
 import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
+import org.apache.kafka.metadata.placement.ReplicaPlacer;
+import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
 import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -114,7 +116,6 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 
@@ -153,7 +154,7 @@ public final class QuorumController implements Controller {
         private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
         private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
         private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
-        private long sessionTimeoutNs = NANOSECONDS.convert(18, TimeUnit.SECONDS);
+        private long sessionTimeoutNs = ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
         private ControllerMetrics controllerMetrics = null;
         private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
         private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
@@ -298,7 +299,7 @@ public final class QuorumController implements Controller {
 
     /**
      * Checks that a configuration resource exists.
-     *
+     * <p>
      * This object must be used only from the controller event thread.
      */
     class ConfigResourceExistenceChecker implements Consumer<ConfigResource> {
@@ -389,7 +390,7 @@ public final class QuorumController implements Controller {
             return exception;
         }
         log.warn("{}: failed with unknown server exception {} at epoch {} in {} us.  " +
-            "Reverting to last committed offset {}.",
+                "Reverting to last committed offset {}.",
             name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
             lastCommittedOffset, exception);
         raftClient.resign(curClaimEpoch);
@@ -589,16 +590,17 @@ public final class QuorumController implements Controller {
         return replicationControl;
     }
 
-    // VisibleForTesting
-    <T> CompletableFuture<T> appendReadEvent(String name, Supplier<T> handler) {
-        ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, handler);
-        queue.append(event);
-        return event.future();
-    }
-
-    <T> CompletableFuture<T> appendReadEvent(String name, long deadlineNs, Supplier<T> handler) {
+    <T> CompletableFuture<T> appendReadEvent(
+        String name,
+        OptionalLong deadlineNs,
+        Supplier<T> handler
+    ) {
         ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, handler);
-        queue.appendWithDeadline(deadlineNs, event);
+        if (deadlineNs.isPresent()) {
+            queue.appendWithDeadline(deadlineNs.getAsLong(), event);
+        } else {
+            queue.append(event);
+        }
         return event.future();
     }
 
@@ -608,17 +610,17 @@ public final class QuorumController implements Controller {
          * operation.  In general, this operation should not modify the "hard state" of
          * the controller.  That modification will happen later on, when we replay the
          * records generated by this function.
-         *
+         * <p>
          * There are cases where this function modifies the "soft state" of the
          * controller.  Mainly, this happens when we process cluster heartbeats.
-         *
+         * <p>
          * This function also generates an RPC result.  In general, if the RPC resulted in
          * an error, the RPC result will be an error, and the generated record list will
          * be empty.  This would happen if we tried to create a topic with incorrect
          * parameters, for example.  Of course, partial errors are possible for batch
          * operations.
          *
-         * @return              A result containing a list of records, and the RPC result.
+         * @return A result containing a list of records, and the RPC result.
          */
         ControllerResult<T> generateRecordsAndResult() throws Exception;
 
@@ -627,7 +629,8 @@ public final class QuorumController implements Controller {
          * with the end offset at which those records were placed.  If there were no
          * records to write, we'll just pass the last write offset.
          */
-        default void processBatchEndOffset(long offset) {}
+        default void processBatchEndOffset(long offset) {
+        }
     }
 
     /**
@@ -738,17 +741,14 @@ public final class QuorumController implements Controller {
     }
 
     private <T> CompletableFuture<T> appendWriteEvent(String name,
-                                                      long deadlineNs,
+                                                      OptionalLong deadlineNs,
                                                       ControllerWriteOperation<T> op) {
         ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
-        queue.appendWithDeadline(deadlineNs, event);
-        return event.future();
-    }
-
-    private <T> CompletableFuture<T> appendWriteEvent(String name,
-                                                      ControllerWriteOperation<T> op) {
-        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
-        queue.append(event);
+        if (deadlineNs.isPresent()) {
+            queue.appendWithDeadline(deadlineNs.getAsLong(), event);
+        } else {
+            queue.append(event);
+        }
         return event.future();
     }
 
@@ -842,9 +842,9 @@ public final class QuorumController implements Controller {
                                     reader.snapshotId(),
                                     offset,
                                     messages
-                                      .stream()
-                                      .map(ApiMessageAndVersion::toString)
-                                      .collect(Collectors.joining(", "))
+                                        .stream()
+                                        .map(ApiMessageAndVersion::toString)
+                                        .collect(Collectors.joining(", "))
                                 );
                             } else {
                                 log.debug(
@@ -955,7 +955,7 @@ public final class QuorumController implements Controller {
         queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), event);
         event.future.exceptionally(e -> {
             if (e instanceof UnknownServerException && e.getCause() != null &&
-                    e.getCause() instanceof RejectedExecutionException) {
+                e.getCause() instanceof RejectedExecutionException) {
                 log.error("Cancelling deferred write event {} because the event queue " +
                     "is now closed.", name);
                 return null;
@@ -995,8 +995,6 @@ public final class QuorumController implements Controller {
 
     private static final String MAYBE_BALANCE_PARTITION_LEADERS = "maybeBalancePartitionLeaders";
 
-    private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
-
     private void maybeScheduleNextBalancePartitionLeaders() {
         if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
             leaderImbalanceCheckIntervalNs.isPresent() &&
@@ -1354,15 +1352,31 @@ public final class QuorumController implements Controller {
             setNodeId(nodeId).
             build();
         this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
-        this.clusterControl = new ClusterControlManager(logContext, clusterId, time,
-            snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
+        this.clusterControl = new ClusterControlManager.Builder().
+            setLogContext(logContext).
+            setClusterId(clusterId).
+            setTime(time).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(sessionTimeoutNs).
+            setReplicaPlacer(replicaPlacer).
+            setControllerMetrics(controllerMetrics).
+            build();
         this.featureControl = new FeatureControlManager(logContext, quorumFeatures, snapshotRegistry);
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
         this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
         this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
-        this.replicationControl = new ReplicationControlManager(snapshotRegistry,
-            logContext, defaultReplicationFactor, defaultNumPartitions, MAX_ELECTIONS_PER_IMBALANCE,
-            isLeaderRecoverySupported, configurationControl, clusterControl, controllerMetrics, createTopicPolicy);
+        this.replicationControl = new ReplicationControlManager.Builder().
+            setSnapshotRegistry(snapshotRegistry).
+            setLogContext(logContext).
+            setDefaultReplicationFactor(defaultReplicationFactor).
+            setDefaultNumPartitions(defaultNumPartitions).
+            setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
+            setIsLeaderRecoverySupported(isLeaderRecoverySupported).
+            setConfigurationControl(configurationControl).
+            setClusterControl(clusterControl).
+            setControllerMetrics(controllerMetrics).
+            setCreateTopicPolicy(createTopicPolicy).
+            build();
         this.authorizer = authorizer;
         authorizer.ifPresent(a -> a.setAclMutator(this));
         this.aclControlManager = new AclControlManager(snapshotRegistry, authorizer);
@@ -1377,95 +1391,120 @@ public final class QuorumController implements Controller {
     }
 
     @Override
-    public CompletableFuture<AlterPartitionResponseData> alterPartition(AlterPartitionRequestData request) {
+    public CompletableFuture<AlterPartitionResponseData> alterPartition(
+        ControllerRequestContext context,
+        AlterPartitionRequestData request
+    ) {
         if (request.topics().isEmpty()) {
             return CompletableFuture.completedFuture(new AlterPartitionResponseData());
         }
-        return appendWriteEvent("alterPartition", () ->
-            replicationControl.alterPartition(request));
+        return appendWriteEvent("alterPartition", context.deadlineNs(),
+            () -> replicationControl.alterPartition(request));
     }
 
     @Override
-    public CompletableFuture<CreateTopicsResponseData>
-            createTopics(CreateTopicsRequestData request, Set<String> describable) {
+    public CompletableFuture<CreateTopicsResponseData> createTopics(
+        ControllerRequestContext context,
+        CreateTopicsRequestData request, Set<String> describable
+    ) {
         if (request.topics().isEmpty()) {
             return CompletableFuture.completedFuture(new CreateTopicsResponseData());
         }
-        return appendWriteEvent("createTopics",
-            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
+        return appendWriteEvent("createTopics", context.deadlineNs(),
             () -> replicationControl.createTopics(request, describable));
     }
 
     @Override
-    public CompletableFuture<Void> unregisterBroker(int brokerId) {
-        return appendWriteEvent("unregisterBroker",
+    public CompletableFuture<Void> unregisterBroker(
+        ControllerRequestContext context,
+        int brokerId
+    ) {
+        return appendWriteEvent("unregisterBroker", context.deadlineNs(),
             () -> replicationControl.unregisterBroker(brokerId));
     }
 
     @Override
-    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(long deadlineNs,
-                                                                            Collection<String> names) {
-        if (names.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
-        return appendReadEvent("findTopicIds", deadlineNs,
+    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(
+        ControllerRequestContext context,
+        Collection<String> names
+    ) {
+        if (names.isEmpty())
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendReadEvent("findTopicIds", context.deadlineNs(),
             () -> replicationControl.findTopicIds(lastCommittedOffset, names));
     }
 
     @Override
-    public CompletableFuture<Map<String, Uuid>> findAllTopicIds(long deadlineNs) {
-        return appendReadEvent("findAllTopicIds", deadlineNs,
+    public CompletableFuture<Map<String, Uuid>> findAllTopicIds(
+        ControllerRequestContext context
+    ) {
+        return appendReadEvent("findAllTopicIds", context.deadlineNs(),
             () -> replicationControl.findAllTopicIds(lastCommittedOffset));
     }
 
     @Override
-    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(long deadlineNs,
-                                                                              Collection<Uuid> ids) {
-        if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
-        return appendReadEvent("findTopicNames", deadlineNs,
+    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(
+        ControllerRequestContext context,
+        Collection<Uuid> ids
+    ) {
+        if (ids.isEmpty())
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendReadEvent("findTopicNames", context.deadlineNs(),
             () -> replicationControl.findTopicNames(lastCommittedOffset, ids));
     }
 
     @Override
-    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(long deadlineNs,
-                                                               Collection<Uuid> ids) {
-        if (ids.isEmpty()) return CompletableFuture.completedFuture(Collections.emptyMap());
-        return appendWriteEvent("deleteTopics", deadlineNs,
+    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(
+        ControllerRequestContext context,
+        Collection<Uuid> ids
+    ) {
+        if (ids.isEmpty())
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        return appendWriteEvent("deleteTopics", context.deadlineNs(),
             () -> replicationControl.deleteTopics(ids));
     }
 
     @Override
-    public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>>
-            describeConfigs(Map<ConfigResource, Collection<String>> resources) {
-        return appendReadEvent("describeConfigs", () ->
-            configurationControl.describeConfigs(lastCommittedOffset, resources));
+    public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(
+        ControllerRequestContext context,
+        Map<ConfigResource, Collection<String>> resources
+    ) {
+        return appendReadEvent("describeConfigs", context.deadlineNs(),
+            () -> configurationControl.describeConfigs(lastCommittedOffset, resources));
     }
 
     @Override
-    public CompletableFuture<ElectLeadersResponseData>
-            electLeaders(ElectLeadersRequestData request) {
+    public CompletableFuture<ElectLeadersResponseData> electLeaders(
+        ControllerRequestContext context,
+        ElectLeadersRequestData request
+    ) {
         // If topicPartitions is null, we will try to trigger a new leader election on
         // all partitions (!).  But if it's empty, there is nothing to do.
         if (request.topicPartitions() != null && request.topicPartitions().isEmpty()) {
             return CompletableFuture.completedFuture(new ElectLeadersResponseData());
         }
-        return appendWriteEvent("electLeaders",
-            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
+        return appendWriteEvent("electLeaders", context.deadlineNs(),
             () -> replicationControl.electLeaders(request));
     }
 
     @Override
-    public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures() {
-        return appendReadEvent("getFinalizedFeatures",
+    public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures(
+        ControllerRequestContext context
+    ) {
+        return appendReadEvent("getFinalizedFeatures", context.deadlineNs(),
             () -> featureControl.finalizedFeatures(lastCommittedOffset));
     }
 
     @Override
     public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+        ControllerRequestContext context,
         Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
-        boolean validateOnly) {
+        boolean validateOnly
+    ) {
         if (configChanges.isEmpty()) {
             return CompletableFuture.completedFuture(Collections.emptyMap());
         }
-        return appendWriteEvent("incrementalAlterConfigs", () -> {
+        return appendWriteEvent("incrementalAlterConfigs", context.deadlineNs(), () -> {
             ControllerResult<Map<ConfigResource, ApiError>> result =
                 configurationControl.incrementalAlterConfigs(configChanges, false);
             if (validateOnly) {
@@ -1477,35 +1516,39 @@ public final class QuorumController implements Controller {
     }
 
     @Override
-    public CompletableFuture<AlterPartitionReassignmentsResponseData>
-            alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
+    public CompletableFuture<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(
+        ControllerRequestContext context,
+        AlterPartitionReassignmentsRequestData request
+    ) {
         if (request.topics().isEmpty()) {
             return CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData());
         }
-        return appendWriteEvent("alterPartitionReassignments",
-            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
+        return appendWriteEvent("alterPartitionReassignments", context.deadlineNs(),
             () -> replicationControl.alterPartitionReassignments(request));
     }
 
     @Override
-    public CompletableFuture<ListPartitionReassignmentsResponseData>
-            listPartitionReassignments(ListPartitionReassignmentsRequestData request) {
+    public CompletableFuture<ListPartitionReassignmentsResponseData> listPartitionReassignments(
+        ControllerRequestContext context,
+        ListPartitionReassignmentsRequestData request
+    ) {
         if (request.topics() != null && request.topics().isEmpty()) {
             return CompletableFuture.completedFuture(
                 new ListPartitionReassignmentsResponseData().setErrorMessage(null));
         }
-        return appendReadEvent("listPartitionReassignments",
-            time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs(), MILLISECONDS),
+        return appendReadEvent("listPartitionReassignments", context.deadlineNs(),
             () -> replicationControl.listPartitionReassignments(request.topics()));
     }
 
     @Override
     public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
-            Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
+        ControllerRequestContext context,
+        Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly
+    ) {
         if (newConfigs.isEmpty()) {
             return CompletableFuture.completedFuture(Collections.emptyMap());
         }
-        return appendWriteEvent("legacyAlterConfigs", () -> {
+        return appendWriteEvent("legacyAlterConfigs", context.deadlineNs(), () -> {
             ControllerResult<Map<ConfigResource, ApiError>> result =
                 configurationControl.legacyAlterConfigs(newConfigs, false);
             if (validateOnly) {
@@ -1517,9 +1560,11 @@ public final class QuorumController implements Controller {
     }
 
     @Override
-    public CompletableFuture<BrokerHeartbeatReply>
-            processBrokerHeartbeat(BrokerHeartbeatRequestData request) {
-        return appendWriteEvent("processBrokerHeartbeat",
+    public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(
+        ControllerRequestContext context,
+        BrokerHeartbeatRequestData request
+    ) {
+        return appendWriteEvent("processBrokerHeartbeat", context.deadlineNs(),
             new ControllerWriteOperation<BrokerHeartbeatReply>() {
                 private final int brokerId = request.brokerId();
                 private boolean inControlledShutdown = false;
@@ -1544,9 +1589,11 @@ public final class QuorumController implements Controller {
     }
 
     @Override
-    public CompletableFuture<BrokerRegistrationReply>
-            registerBroker(BrokerRegistrationRequestData request) {
-        return appendWriteEvent("registerBroker", () -> {
+    public CompletableFuture<BrokerRegistrationReply> registerBroker(
+        ControllerRequestContext context,
+        BrokerRegistrationRequestData request
+    ) {
+        return appendWriteEvent("registerBroker", context.deadlineNs(), () -> {
             ControllerResult<BrokerRegistrationReply> result = clusterControl.
                 registerBroker(request, writeOffset + 1, featureControl.
                     finalizedFeatures(Long.MAX_VALUE));
@@ -1557,11 +1604,14 @@ public final class QuorumController implements Controller {
 
     @Override
     public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
-            Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly) {
+        ControllerRequestContext context,
+        Collection<ClientQuotaAlteration> quotaAlterations,
+        boolean validateOnly
+    ) {
         if (quotaAlterations.isEmpty()) {
             return CompletableFuture.completedFuture(Collections.emptyMap());
         }
-        return appendWriteEvent("alterClientQuotas", () -> {
+        return appendWriteEvent("alterClientQuotas", context.deadlineNs(), () -> {
             ControllerResult<Map<ClientQuotaEntity, ApiError>> result =
                 clientQuotaControlManager.alterClientQuotas(quotaAlterations);
             if (validateOnly) {
@@ -1574,18 +1624,22 @@ public final class QuorumController implements Controller {
 
     @Override
     public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
-            AllocateProducerIdsRequestData request) {
-        return appendWriteEvent("allocateProducerIds",
+        ControllerRequestContext context,
+        AllocateProducerIdsRequestData request
+    ) {
+        return appendWriteEvent("allocateProducerIds", context.deadlineNs(),
             () -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch()))
             .thenApply(result -> new AllocateProducerIdsResponseData()
-                    .setProducerIdStart(result.firstProducerId())
-                    .setProducerIdLen(result.size()));
+                .setProducerIdStart(result.firstProducerId())
+                .setProducerIdLen(result.size()));
     }
 
     @Override
     public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(
-            UpdateFeaturesRequestData request) {
-        return appendWriteEvent("updateFeatures", () -> {
+        ControllerRequestContext context,
+        UpdateFeaturesRequestData request
+    ) {
+        return appendWriteEvent("updateFeatures", context.deadlineNs(), () -> {
             Map<String, Short> updates = new HashMap<>();
             Map<String, FeatureUpdate.UpgradeType> upgradeTypes = new HashMap<>();
             request.featureUpdates().forEach(featureUpdate -> {
@@ -1608,12 +1662,14 @@ public final class QuorumController implements Controller {
     }
 
     @Override
-    public CompletableFuture<List<CreatePartitionsTopicResult>>
-            createPartitions(long deadlineNs, List<CreatePartitionsTopic> topics) {
+    public CompletableFuture<List<CreatePartitionsTopicResult>> createPartitions(
+        ControllerRequestContext context,
+        List<CreatePartitionsTopic> topics
+    ) {
         if (topics.isEmpty()) {
             return CompletableFuture.completedFuture(Collections.emptyList());
         }
-        return appendWriteEvent("createPartitions", deadlineNs,
+        return appendWriteEvent("createPartitions", context.deadlineNs(),
             () -> replicationControl.createPartitions(topics));
     }
 
@@ -1634,13 +1690,21 @@ public final class QuorumController implements Controller {
     }
 
     @Override
-    public CompletableFuture<List<AclCreateResult>> createAcls(List<AclBinding> aclBindings) {
-        return appendWriteEvent("createAcls", () -> aclControlManager.createAcls(aclBindings));
+    public CompletableFuture<List<AclCreateResult>> createAcls(
+        ControllerRequestContext context,
+        List<AclBinding> aclBindings
+    ) {
+        return appendWriteEvent("createAcls", context.deadlineNs(),
+            () -> aclControlManager.createAcls(aclBindings));
     }
 
     @Override
-    public CompletableFuture<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> filters) {
-        return appendWriteEvent("deleteAcls", () -> aclControlManager.deleteAcls(filters));
+    public CompletableFuture<List<AclDeleteResult>> deleteAcls(
+        ControllerRequestContext context,
+        List<AclBindingFilter> filters
+    ) {
+        return appendWriteEvent("deleteAcls", context.deadlineNs(),
+            () -> aclControlManager.deleteAcls(filters));
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index f104364d1a..c9d6a5997f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -74,6 +74,9 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.apache.kafka.metadata.placement.ClusterDescriber;
+import org.apache.kafka.metadata.placement.PlacementSpec;
+import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistration;
@@ -132,6 +135,102 @@ import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
  * of each partition, as well as administrative tasks like creating or deleting topics.
  */
 public class ReplicationControlManager {
+    static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+    static class Builder {
+        private SnapshotRegistry snapshotRegistry = null;
+        private LogContext logContext = null;
+        private short defaultReplicationFactor = (short) 3;
+        private int defaultNumPartitions = 1;
+        private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
+        private boolean isLeaderRecoverySupported = true;
+        private ConfigurationControlManager configurationControl = null;
+        private ClusterControlManager clusterControl = null;
+        private ControllerMetrics controllerMetrics = null;
+        private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
+
+        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setDefaultReplicationFactor(short defaultReplicationFactor) {
+            this.defaultReplicationFactor = defaultReplicationFactor;
+            return this;
+        }
+
+        Builder setDefaultNumPartitions(int defaultNumPartitions) {
+            this.defaultNumPartitions = defaultNumPartitions;
+            return this;
+        }
+
+        Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
+            this.maxElectionsPerImbalance = maxElectionsPerImbalance;
+            return this;
+        }
+
+        Builder setIsLeaderRecoverySupported(boolean isLeaderRecoverySupported) {
+            this.isLeaderRecoverySupported = isLeaderRecoverySupported;
+            return this;
+        }
+
+        Builder setConfigurationControl(ConfigurationControlManager configurationControl) {
+            this.configurationControl = configurationControl;
+            return this;
+        }
+
+        Builder setClusterControl(ClusterControlManager clusterControl) {
+            this.clusterControl = clusterControl;
+            return this;
+        }
+
+        Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
+            this.controllerMetrics = controllerMetrics;
+            return this;
+        }
+
+        Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
+            this.createTopicPolicy = createTopicPolicy;
+            return this;
+        }
+
+        ReplicationControlManager build() {
+            if (configurationControl == null) {
+                throw new RuntimeException("You must specify configurationControl.");
+            }
+            if (clusterControl == null) {
+                throw new RuntimeException("You must specify clusterControl.");
+            }
+            if (controllerMetrics == null) {
+                throw new RuntimeException("You must specify controllerMetrics.");
+            }
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry();
+            return new ReplicationControlManager(snapshotRegistry,
+                logContext,
+                defaultReplicationFactor,
+                defaultNumPartitions,
+                maxElectionsPerImbalance,
+                isLeaderRecoverySupported,
+                configurationControl,
+                clusterControl,
+                controllerMetrics,
+                createTopicPolicy);
+        }
+    }
+
+    class KRaftClusterDescriber implements ClusterDescriber {
+        @Override
+        public Iterator<UsableBroker> usableBrokers() {
+            return clusterControl.usableBrokers();
+        }
+    }
+
     static class TopicControlInfo {
         private final String name;
         private final Uuid id;
@@ -253,16 +352,23 @@ public class ReplicationControlManager {
      */
     private final TimelineHashSet<TopicIdPartition> imbalancedPartitions;
 
-    ReplicationControlManager(SnapshotRegistry snapshotRegistry,
-                              LogContext logContext,
-                              short defaultReplicationFactor,
-                              int defaultNumPartitions,
-                              int maxElectionsPerImbalance,
-                              boolean isLeaderRecoverySupported,
-                              ConfigurationControlManager configurationControl,
-                              ClusterControlManager clusterControl,
-                              ControllerMetrics controllerMetrics,
-                              Optional<CreateTopicPolicy> createTopicPolicy) {
+    /**
+     * A ClusterDescriber which supplies cluster information to our ReplicaPlacer.
+     */
+    private final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
+
+    private ReplicationControlManager(
+        SnapshotRegistry snapshotRegistry,
+        LogContext logContext,
+        short defaultReplicationFactor,
+        int defaultNumPartitions,
+        int maxElectionsPerImbalance,
+        boolean isLeaderRecoverySupported,
+        ConfigurationControlManager configurationControl,
+        ClusterControlManager clusterControl,
+        ControllerMetrics controllerMetrics,
+        Optional<CreateTopicPolicy> createTopicPolicy
+    ) {
         this.snapshotRegistry = snapshotRegistry;
         this.log = logContext.logger(ReplicationControlManager.class);
         this.defaultReplicationFactor = defaultReplicationFactor;
@@ -567,8 +673,11 @@ public class ReplicationControlManager {
             short replicationFactor = topic.replicationFactor() == -1 ?
                 defaultReplicationFactor : topic.replicationFactor();
             try {
-                List<List<Integer>> replicas = clusterControl.
-                    placeReplicas(0, numPartitions, replicationFactor);
+                List<List<Integer>> replicas = clusterControl.replicaPlacer().place(new PlacementSpec(
+                    0,
+                    numPartitions,
+                    replicationFactor
+                ), clusterDescriber);
                 for (int partitionId = 0; partitionId < replicas.size(); partitionId++) {
                     int[] r = Replicas.toArray(replicas.get(partitionId));
                     newParts.put(partitionId,
@@ -1318,8 +1427,11 @@ public class ReplicationControlManager {
                 isrs.add(isr);
             }
         } else {
-            placements = clusterControl.placeReplicas(startPartitionId, additional,
-                replicationFactor);
+            placements = clusterControl.replicaPlacer().place(new PlacementSpec(
+                startPartitionId,
+                additional,
+                replicationFactor
+            ), clusterDescriber);
             isrs = placements;
         }
         int partitionId = startPartitionId;
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/AclMutator.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/AclMutator.java
index 95cd3702bf..9755847165 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/AclMutator.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/AclMutator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.metadata.authorizer;
 
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.controller.ControllerRequestContext;
 import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 
@@ -37,17 +38,26 @@ public interface AclMutator {
      * Create the specified ACLs. If any ACL already exists, nothing will be done for that
      * one, and we will return a success result for it.
      *
+     * @param context       The controller request context.
      * @param aclBindings   The ACLs to create.
+     *
      * @return              The results for each AclBinding, in the order they were passed.
      */
-    CompletableFuture<List<AclCreateResult>> createAcls(List<AclBinding> aclBindings);
+    CompletableFuture<List<AclCreateResult>> createAcls(
+        ControllerRequestContext context,
+        List<AclBinding> aclBindings
+    );
 
     /**
      * Delete some ACLs based on the set of filters that is passed in.
      *
+     * @param context               The controller request context.
      * @param aclBindingFilters     The filters.
+     *
      * @return                      The results for each filter, in the order they were passed.
      */
     CompletableFuture<List<AclDeleteResult>> deleteAcls(
-            List<AclBindingFilter> aclBindingFilters);
+        ControllerRequestContext context,
+        List<AclBindingFilter> aclBindingFilters
+    );
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
index 90117004b3..cae3528333 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.controller.ControllerRequestContext;
 import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
@@ -31,6 +32,7 @@ import org.apache.kafka.server.authorizer.Authorizer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 
@@ -88,7 +90,9 @@ public interface ClusterMetadataAuthorizer extends Authorizer {
         List<CompletableFuture<AclCreateResult>> futures = new ArrayList<>(aclBindings.size());
         AclMutator aclMutator = aclMutatorOrException();
         aclBindings.forEach(b -> futures.add(new CompletableFuture<>()));
-        aclMutator.createAcls(aclBindings).whenComplete((results, throwable) -> {
+        ControllerRequestContext context = new ControllerRequestContext(
+            requestContext.principal(), OptionalLong.empty());
+        aclMutator.createAcls(context, aclBindings).whenComplete((results, throwable) -> {
             if (throwable == null && results.size() != futures.size()) {
                 throwable = new UnknownServerException("Invalid size " +
                     "of result set from controller. Expected " + futures.size() +
@@ -126,7 +130,9 @@ public interface ClusterMetadataAuthorizer extends Authorizer {
         List<CompletableFuture<AclDeleteResult>> futures = new ArrayList<>(filters.size());
         AclMutator aclMutator = aclMutatorOrException();
         filters.forEach(b -> futures.add(new CompletableFuture<>()));
-        aclMutator.deleteAcls(filters).whenComplete((results, throwable) -> {
+        ControllerRequestContext context = new ControllerRequestContext(
+            requestContext.principal(), OptionalLong.empty());
+        aclMutator.deleteAcls(context, filters).whenComplete((results, throwable) -> {
             if (throwable == null && results.size() != futures.size()) {
                 throwable = new UnknownServerException("Invalid size " +
                     "of result set from controller. Expected " + futures.size() +
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockRandom.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
similarity index 69%
copy from metadata/src/test/java/org/apache/kafka/controller/MockRandom.java
copy to metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
index c42a158b66..8aaa092205 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockRandom.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/ClusterDescriber.java
@@ -15,20 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.controller;
+package org.apache.kafka.metadata.placement;
 
-import java.util.Random;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Iterator;
 
 
 /**
- * A subclass of Random with a fixed seed and generation algorithm.
+ * Can describe a cluster to a ReplicaPlacer.
  */
-public class MockRandom extends Random {
-    private long state = 17;
-
-    @Override
-    protected int next(int bits) {
-        state = (state * 2862933555777941757L) + 3037000493L;
-        return (int) (state >>> (64 - bits));
-    }
+@InterfaceStability.Unstable
+public interface ClusterDescriber {
+    /**
+     * Get an iterator through the usable brokers.
+     */
+    Iterator<UsableBroker> usableBrokers();
 }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/PlacementSpec.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/PlacementSpec.java
new file mode 100644
index 0000000000..85daaf59e5
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/PlacementSpec.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.placement;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Objects;
+
+
+/**
+ * Specifies a replica placement that we want to make.
+ */
+@InterfaceStability.Unstable
+public class PlacementSpec {
+    private final int startPartition;
+
+    private final int numPartitions;
+
+    private final short numReplicas;
+
+    public PlacementSpec(
+        int startPartition,
+        int numPartitions,
+        short numReplicas
+    ) {
+        this.startPartition = startPartition;
+        this.numPartitions = numPartitions;
+        this.numReplicas = numReplicas;
+    }
+
+    public int startPartition() {
+        return startPartition;
+    }
+
+    public int numPartitions() {
+        return numPartitions;
+    }
+
+    public short numReplicas() {
+        return numReplicas;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null) return false;
+        if (!(o.getClass().equals(this.getClass()))) return false;
+        PlacementSpec other = (PlacementSpec) o;
+        return startPartition == other.startPartition &&
+            numPartitions == other.numPartitions &&
+            numReplicas == other.numReplicas;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(startPartition,
+            numPartitions,
+            numReplicas);
+    }
+
+    @Override
+    public String toString() {
+        return "PlacementSpec" +
+            "(startPartition=" + startPartition +
+            ", numPartitions=" + numPartitions +
+            ", numReplicas=" + numReplicas +
+            ")";
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacer.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/ReplicaPlacer.java
similarity index 59%
rename from metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacer.java
rename to metadata/src/main/java/org/apache/kafka/metadata/placement/ReplicaPlacer.java
index 9a705f43d8..6af37fd960 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/ReplicaPlacer.java
@@ -15,36 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.controller;
+package org.apache.kafka.metadata.placement;
 
-import java.util.Iterator;
 import java.util.List;
+
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
-import org.apache.kafka.metadata.UsableBroker;
 
 
 /**
  * The interface which a Kafka replica placement policy must implement.
  */
 @InterfaceStability.Unstable
-interface ReplicaPlacer {
+public interface ReplicaPlacer {
     /**
      * Create a new replica placement.
      *
-     * @param startPartition        The partition ID to start with.
-     * @param numPartitions         The number of partitions to create placements for.
-     * @param numReplicas           The number of replicas to create for each partitions.
-     *                              Must be positive.
-     * @param iterator              An iterator that yields all the usable brokers.
+     * @param placement     What we're trying to place.
+     * @param cluster       A description of the cluster we're trying to place in.
      *
-     * @return                      A list of replica lists.
+     * @return              A list of replica lists.
      *
      * @throws InvalidReplicationFactorException    If too many replicas were requested.
      */
-    List<List<Integer>> place(int startPartition,
-                              int numPartitions,
-                              short numReplicas,
-                              Iterator<UsableBroker> iterator)
-        throws InvalidReplicationFactorException;
+    List<List<Integer>> place(
+        PlacementSpec placement,
+        ClusterDescriber cluster
+    ) throws InvalidReplicationFactorException;
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
similarity index 95%
rename from metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
rename to metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
index 031354c56d..43f41179d4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.controller;
+package org.apache.kafka.metadata.placement;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -28,7 +28,6 @@ import java.util.Random;
 
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.metadata.OptionalStringComparator;
-import org.apache.kafka.metadata.UsableBroker;
 
 
 /**
@@ -428,17 +427,18 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
     }
 
     @Override
-    public List<List<Integer>> place(int startPartition,
-                                     int numPartitions,
-                                     short replicationFactor,
-                                     Iterator<UsableBroker> iterator) {
-        RackList rackList = new RackList(random, iterator);
-        throwInvalidReplicationFactorIfNonPositive(replicationFactor);
+    public List<List<Integer>> place(
+        PlacementSpec placement,
+        ClusterDescriber cluster
+    ) throws InvalidReplicationFactorException {
+        RackList rackList = new RackList(random, cluster.usableBrokers());
+        throwInvalidReplicationFactorIfNonPositive(placement.numReplicas());
         throwInvalidReplicationFactorIfZero(rackList.numUnfencedBrokers());
-        throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor, rackList.numTotalBrokers());
-        List<List<Integer>> placements = new ArrayList<>(numPartitions);
-        for (int partition = 0; partition < numPartitions; partition++) {
-            placements.add(rackList.place(replicationFactor));
+        throwInvalidReplicationFactorIfTooFewBrokers(placement.numReplicas(),
+            rackList.numTotalBrokers());
+        List<List<Integer>> placements = new ArrayList<>(placement.numPartitions());
+        for (int partition = 0; partition < placement.numPartitions(); partition++) {
+            placements.add(rackList.place(placement.numReplicas()));
         }
         return placements;
     }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/UsableBroker.java
similarity index 83%
rename from metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java
rename to metadata/src/main/java/org/apache/kafka/metadata/placement/UsableBroker.java
index 9c04ebd480..75d16d7718 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/UsableBroker.java
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.metadata;
+package org.apache.kafka.metadata.placement;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Objects;
 import java.util.Optional;
@@ -24,6 +26,7 @@ import java.util.Optional;
 /**
  * A broker where a replica can be placed.
  */
+@InterfaceStability.Unstable
 public class UsableBroker {
     private final int id;
 
@@ -58,11 +61,17 @@ public class UsableBroker {
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, rack, fenced);
+        return Objects.hash(id,
+            rack,
+            fenced);
     }
 
     @Override
     public String toString() {
-        return "UsableBroker(id=" + id + ", rack=" + rack + ", fenced=" + fenced + ")";
+        return "UsableBroker" +
+            "(id=" + id +
+            ", rack=" + rack +
+            ", fenced=" + fenced +
+            ")";
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
index c5c46abab3..28387b17a0 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatState;
 import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateIterator;
 import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatStateList;
 import org.apache.kafka.controller.BrokerHeartbeatManager.UsableBrokerIterator;
-import org.apache.kafka.metadata.UsableBroker;
+import org.apache.kafka.metadata.placement.UsableBroker;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index ea223d7de8..a173cf5f94 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -20,9 +20,9 @@ package org.apache.kafka.controller;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
-import java.util.Random;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InconsistentClusterIdException;
@@ -39,6 +39,9 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.placement.ClusterDescriber;
+import org.apache.kafka.metadata.placement.PlacementSpec;
+import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
@@ -59,9 +62,12 @@ public class ClusterControlManagerTest {
         MockTime time = new MockTime(0, 0, 0);
 
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        ClusterControlManager clusterControl = new ClusterControlManager(
-            new LogContext(), Uuid.randomUuid().toString(), time, snapshotRegistry, 1000,
-                new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
+        ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+            setTime(time).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(1000).
+            setControllerMetrics(new MockControllerMetrics()).
+            build();
         clusterControl.activate();
         assertFalse(clusterControl.unfenced(0));
 
@@ -90,10 +96,13 @@ public class ClusterControlManagerTest {
     @Test
     public void testRegistrationWithIncorrectClusterId() throws Exception {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        ClusterControlManager clusterControl = new ClusterControlManager(
-            new LogContext(), "fPZv1VBsRFmnlRvmGcOW9w", new MockTime(0, 0, 0),
-            snapshotRegistry, 1000,
-            new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
+        ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+            setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
+            setTime(new MockTime(0, 0, 0)).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(1000).
+            setControllerMetrics(new MockControllerMetrics()).
+            build();
         clusterControl.activate();
         assertThrows(InconsistentClusterIdException.class, () ->
             clusterControl.registerBroker(new BrokerRegistrationRequestData().
@@ -118,10 +127,12 @@ public class ClusterControlManagerTest {
             setName("PLAINTEXT").
             setHost("example.com"));
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        ClusterControlManager clusterControl = new ClusterControlManager(
-            new LogContext(), Uuid.randomUuid().toString(), new MockTime(0, 0, 0),
-            snapshotRegistry, 1000,
-            new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
+        ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+            setTime(new MockTime(0, 0, 0)).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(1000).
+            setControllerMetrics(new MockControllerMetrics()).
+            build();
         clusterControl.activate();
         clusterControl.replay(brokerRecord);
         assertEquals(new BrokerRegistration(1, 100,
@@ -141,10 +152,12 @@ public class ClusterControlManagerTest {
     public void testPlaceReplicas(int numUsableBrokers) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        MockRandom random = new MockRandom();
-        ClusterControlManager clusterControl = new ClusterControlManager(
-            new LogContext(),  Uuid.randomUuid().toString(), time, snapshotRegistry, 1000,
-            new StripedReplicaPlacer(random), new MockControllerMetrics());
+        ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+            setTime(time).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(1000).
+            setControllerMetrics(new MockControllerMetrics()).
+            build();
         clusterControl.activate();
         for (int i = 0; i < numUsableBrokers; i++) {
             RegisterBrokerRecord brokerRecord =
@@ -165,7 +178,17 @@ public class ClusterControlManagerTest {
                 String.format("broker %d was not unfenced.", i));
         }
         for (int i = 0; i < 100; i++) {
-            List<List<Integer>> results = clusterControl.placeReplicas(0, 1, (short) 3);
+            List<List<Integer>> results = clusterControl.replicaPlacer().place(
+                new PlacementSpec(0,
+                    1,
+                    (short) 3),
+                new ClusterDescriber() {
+                    @Override
+                    public Iterator<UsableBroker> usableBrokers() {
+                        return clusterControl.usableBrokers();
+                    }
+                }
+            );
             HashSet<Integer> seen = new HashSet<>();
             for (Integer result : results.get(0)) {
                 assertTrue(result >= 0);
@@ -179,9 +202,12 @@ public class ClusterControlManagerTest {
     public void testIterator() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
-        ClusterControlManager clusterControl = new ClusterControlManager(
-            new LogContext(), Uuid.randomUuid().toString(), time, snapshotRegistry, 1000,
-            new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
+        ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+            setTime(time).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(1000).
+            setControllerMetrics(new MockControllerMetrics()).
+            build();
         clusterControl.activate();
         assertFalse(clusterControl.unfenced(0));
         for (int i = 0; i < 3; i++) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 21613607dc..284b8f7c16 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.controller;
 
-import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
@@ -33,7 +32,6 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.Random;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -49,14 +47,14 @@ public class ProducerIdControlManagerTest {
 
     @BeforeEach
     public void setUp() {
-        final LogContext logContext = new LogContext();
-        String clusterId = Uuid.randomUuid().toString();
         final MockTime time = new MockTime();
-        final Random random = new Random();
-        snapshotRegistry = new SnapshotRegistry(logContext);
-        clusterControl = new ClusterControlManager(
-            logContext, clusterId, time, snapshotRegistry, 1000,
-            new StripedReplicaPlacer(random), new MockControllerMetrics());
+        snapshotRegistry = new SnapshotRegistry(new LogContext());
+        clusterControl = new ClusterControlManager.Builder().
+            setTime(time).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(1000).
+            setControllerMetrics(new MockControllerMetrics()).
+            build();
 
         clusterControl.activate();
         for (int i = 0; i < 4; i++) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 40b7274f02..9f569251de 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -41,6 +41,7 @@ import java.util.stream.StreamSupport;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -90,13 +91,13 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
-import static java.util.concurrent.TimeUnit.HOURS;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
+import static org.apache.kafka.controller.ControllerRequestContext.ANONYMOUS_CONTEXT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -134,7 +135,8 @@ public class QuorumControllerTest {
                 b.setConfigSchema(SCHEMA);
             })
         ) {
-            controlEnv.activeController().registerBroker(new BrokerRegistrationRequestData().
+            controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
+                new BrokerRegistrationRequestData().
                 setBrokerId(0).setClusterId(logEnv.clusterId())).get();
             testConfigurationOperations(controlEnv.activeController());
         }
@@ -142,18 +144,18 @@ public class QuorumControllerTest {
 
     private void testConfigurationOperations(QuorumController controller) throws Throwable {
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE),
-            controller.incrementalAlterConfigs(Collections.singletonMap(
+            controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
                 BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), true).get());
         assertEquals(Collections.singletonMap(BROKER0,
             new ResultOrError<>(Collections.emptyMap())),
-            controller.describeConfigs(Collections.singletonMap(
+            controller.describeConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
                 BROKER0, Collections.emptyList())).get());
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE),
-            controller.incrementalAlterConfigs(Collections.singletonMap(
+            controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
                 BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), false).get());
         assertEquals(Collections.singletonMap(BROKER0, new ResultOrError<>(Collections.
                 singletonMap("baz", "123"))),
-            controller.describeConfigs(Collections.singletonMap(
+            controller.describeConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
                 BROKER0, Collections.emptyList())).get());
     }
 
@@ -169,8 +171,9 @@ public class QuorumControllerTest {
                 b.setConfigSchema(SCHEMA);
             })
         ) {
-            controlEnv.activeController().registerBroker(new BrokerRegistrationRequestData().
-                setBrokerId(0).setClusterId(logEnv.clusterId())).get();
+            controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
+                new BrokerRegistrationRequestData().
+                    setBrokerId(0).setClusterId(logEnv.clusterId())).get();
             testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
         }
     }
@@ -178,14 +181,14 @@ public class QuorumControllerTest {
     private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
                                                     QuorumController controller)
                                                     throws Throwable {
-        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2L));
+        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(1L));
         CompletableFuture<Map<ConfigResource, ApiError>> future1 =
-            controller.incrementalAlterConfigs(Collections.singletonMap(
+            controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
                 BROKER0, Collections.singletonMap("baz", entry(SET, "123"))), false);
         assertFalse(future1.isDone());
         assertEquals(Collections.singletonMap(BROKER0,
             new ResultOrError<>(Collections.emptyMap())),
-            controller.describeConfigs(Collections.singletonMap(
+            controller.describeConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
                 BROKER0, Collections.emptyList())).get());
         logEnv.logManagers().forEach(m -> m.setMaxReadOffset(3L));
         assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get());
@@ -213,6 +216,7 @@ public class QuorumControllerTest {
 
             for (Integer brokerId : allBrokers) {
                 CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
+                    ANONYMOUS_CONTEXT,
                     new BrokerRegistrationRequestData().
                         setBrokerId(brokerId).
                         setClusterId(active.clusterId()).
@@ -234,7 +238,8 @@ public class QuorumControllerTest {
                     new CreatableTopic().setName("foo").setNumPartitions(numberOfPartitions).
                         setReplicationFactor(replicationFactor)).iterator()));
             CreateTopicsResponseData createTopicsResponseData = active.createTopics(
-                createTopicsRequestData, Collections.singleton("foo")).get();
+                ANONYMOUS_CONTEXT, createTopicsRequestData,
+                Collections.singleton("foo")).get();
             assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
             Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
 
@@ -303,6 +308,7 @@ public class QuorumControllerTest {
 
             for (Integer brokerId : allBrokers) {
                 CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
+                    ANONYMOUS_CONTEXT,
                     new BrokerRegistrationRequestData().
                         setBrokerId(brokerId).
                         setClusterId(active.clusterId()).
@@ -324,7 +330,7 @@ public class QuorumControllerTest {
                     new CreatableTopic().setName("foo").setNumPartitions(numberOfPartitions).
                         setReplicationFactor(replicationFactor)).iterator()));
             CreateTopicsResponseData createTopicsResponseData = active.createTopics(
-                createTopicsRequestData, Collections.singleton("foo")).get();
+                ANONYMOUS_CONTEXT, createTopicsRequestData, Collections.singleton("foo")).get();
             assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
             Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
 
@@ -362,6 +368,7 @@ public class QuorumControllerTest {
             // Re-register all fenced brokers
             for (Integer brokerId : brokersToFence) {
                 CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
+                    ANONYMOUS_CONTEXT,
                     new BrokerRegistrationRequestData().
                         setBrokerId(brokerId).
                         setClusterId(active.clusterId()).
@@ -393,7 +400,7 @@ public class QuorumControllerTest {
                 .setBrokerEpoch(brokerEpochs.get(partitionRegistration.leader));
             alterPartitionRequest.topics().add(topicData);
 
-            active.alterPartition(alterPartitionRequest).get();
+            active.alterPartition(ANONYMOUS_CONTEXT, alterPartitionRequest).get();
 
             // Check that partitions are balanced
             AtomicLong lastHeartbeat = new AtomicLong(active.time().milliseconds());
@@ -422,6 +429,7 @@ public class QuorumControllerTest {
                     setHost("localhost").setPort(9092));
                 QuorumController active = controlEnv.activeController();
                 CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
+                    ANONYMOUS_CONTEXT,
                     new BrokerRegistrationRequestData().
                         setBrokerId(0).
                         setClusterId(active.clusterId()).
@@ -435,28 +443,31 @@ public class QuorumControllerTest {
                             new CreatableTopic().setName("foo").setNumPartitions(1).
                                 setReplicationFactor((short) 1)).iterator()));
                 assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), active.createTopics(
+                    ANONYMOUS_CONTEXT,
                     createTopicsRequestData, Collections.singleton("foo")).get().
                         topics().find("foo").errorCode());
                 assertEquals("Unable to replicate the partition 1 time(s): All brokers " +
-                    "are currently fenced.", active.createTopics(createTopicsRequestData,
-                        Collections.singleton("foo")).get().topics().find("foo").errorMessage());
+                    "are currently fenced.", active.createTopics(ANONYMOUS_CONTEXT,
+                        createTopicsRequestData, Collections.singleton("foo")).
+                            get().topics().find("foo").errorMessage());
                 assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                    active.processBrokerHeartbeat(new BrokerHeartbeatRequestData().
+                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
                             setWantFence(false).setBrokerEpoch(0L).setBrokerId(0).
                             setCurrentMetadataOffset(100000L)).get());
-                assertEquals(Errors.NONE.code(), active.createTopics(createTopicsRequestData,
-                    Collections.singleton("foo")).get().topics().find("foo").errorCode());
+                assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
+                    createTopicsRequestData, Collections.singleton("foo")).
+                        get().topics().find("foo").errorCode());
                 CompletableFuture<TopicIdPartition> topicPartitionFuture = active.appendReadEvent(
-                    "debugGetPartition", () -> {
+                    "debugGetPartition", OptionalLong.empty(), () -> {
                         Iterator<TopicIdPartition> iterator = active.
                             replicationControl().brokersToIsrs().iterator(0, true);
                         assertTrue(iterator.hasNext());
                         return iterator.next();
                     });
                 assertEquals(0, topicPartitionFuture.get().partitionId());
-                active.unregisterBroker(0).get();
+                active.unregisterBroker(ANONYMOUS_CONTEXT, 0).get();
                 topicPartitionFuture = active.appendReadEvent(
-                    "debugGetPartition", () -> {
+                    "debugGetPartition", OptionalLong.empty(), () -> {
                         Iterator<TopicIdPartition> iterator = active.
                             replicationControl().brokersToIsrs().partitionsWithNoLeader();
                         assertTrue(iterator.hasNext());
@@ -484,7 +495,7 @@ public class QuorumControllerTest {
             })) {
                 QuorumController active = controlEnv.activeController();
                 for (int i = 0; i < numBrokers; i++) {
-                    BrokerRegistrationReply reply = active.registerBroker(
+                    BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
                         new BrokerRegistrationRequestData().
                             setBrokerId(i).
                             setRack(null).
@@ -497,11 +508,11 @@ public class QuorumControllerTest {
                 }
                 for (int i = 0; i < numBrokers - 1; i++) {
                     assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                        active.processBrokerHeartbeat(new BrokerHeartbeatRequestData().
+                        active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
                             setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
                             setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
                 }
-                CreateTopicsResponseData fooData = active.createTopics(
+                CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT,
                     new CreateTopicsRequestData().setTopics(
                         new CreatableTopicCollection(Collections.singleton(
                             new CreatableTopic().setName("foo").setNumPartitions(-1).
@@ -516,7 +527,7 @@ public class QuorumControllerTest {
                                             iterator()))).iterator())),
                     Collections.singleton("foo")).get();
                 fooId = fooData.topics().find("foo").topicId();
-                active.allocateProducerIds(
+                active.allocateProducerIds(ANONYMOUS_CONTEXT,
                     new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
                 long snapshotLogOffset = active.beginWritingSnapshot().get();
                 reader = logEnv.waitForSnapshot(snapshotLogOffset);
@@ -554,7 +565,7 @@ public class QuorumControllerTest {
             })) {
                 QuorumController active = controlEnv.activeController();
                 for (int i = 0; i < numBrokers; i++) {
-                    BrokerRegistrationReply reply = active.registerBroker(
+                    BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
                         new BrokerRegistrationRequestData().
                             setBrokerId(i).
                             setRack(null).
@@ -567,11 +578,11 @@ public class QuorumControllerTest {
                 }
                 for (int i = 0; i < numBrokers - 1; i++) {
                     assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                        active.processBrokerHeartbeat(new BrokerHeartbeatRequestData().
+                        active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
                             setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
                             setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
                 }
-                CreateTopicsResponseData fooData = active.createTopics(
+                CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT,
                     new CreateTopicsRequestData().setTopics(
                         new CreatableTopicCollection(Collections.singleton(
                             new CreatableTopic().setName("foo").setNumPartitions(-1).
@@ -586,7 +597,7 @@ public class QuorumControllerTest {
                                             iterator()))).iterator())),
                     Collections.singleton("foo")).get();
                 fooId = fooData.topics().find("foo").topicId();
-                active.allocateProducerIds(
+                active.allocateProducerIds(ANONYMOUS_CONTEXT,
                     new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
 
                 SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(logEnv.waitForLatestSnapshot());
@@ -610,7 +621,7 @@ public class QuorumControllerTest {
             })) {
                 QuorumController active = controlEnv.activeController();
                 for (int i = 0; i < numBrokers; i++) {
-                    BrokerRegistrationReply reply = active.registerBroker(
+                    BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
                         new BrokerRegistrationRequestData().
                             setBrokerId(i).
                             setRack(null).
@@ -621,7 +632,7 @@ public class QuorumControllerTest {
                                 setPort(9092 + i)).iterator()))).get();
                     brokerEpochs.put(i, reply.epoch());
                     assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                        active.processBrokerHeartbeat(new BrokerHeartbeatRequestData().
+                        active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
                             setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
                             setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
                 }
@@ -636,7 +647,7 @@ public class QuorumControllerTest {
                 while (logEnv.appendedBytes() < maxNewRecordBytes) {
                     counter += 1;
                     String topicName = String.format("foo-%s", counter);
-                    active.createTopics(new CreateTopicsRequestData().setTopics(
+                    active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(
                             new CreatableTopicCollection(Collections.singleton(
                                 new CreatableTopic().setName(topicName).setNumPartitions(-1).
                                     setReplicationFactor((short) -1).
@@ -781,30 +792,32 @@ public class QuorumControllerTest {
             })) {
                 QuorumController controller = controlEnv.activeController();
                 CountDownLatch countDownLatch = controller.pause();
+                long now = controller.time().nanoseconds();
+                ControllerRequestContext context0 = new ControllerRequestContext(
+                    KafkaPrincipal.ANONYMOUS, OptionalLong.of(now));
                 CompletableFuture<CreateTopicsResponseData> createFuture =
-                    controller.createTopics(new CreateTopicsRequestData().setTimeoutMs(0).
+                    controller.createTopics(context0, new CreateTopicsRequestData().setTimeoutMs(0).
                         setTopics(new CreatableTopicCollection(Collections.singleton(
                             new CreatableTopic().setName("foo")).iterator())),
                         Collections.emptySet());
-                long now = controller.time().nanoseconds();
                 CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
-                    controller.deleteTopics(now, Collections.singletonList(Uuid.ZERO_UUID));
+                    controller.deleteTopics(context0, Collections.singletonList(Uuid.ZERO_UUID));
                 CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIdsFuture =
-                    controller.findTopicIds(now, Collections.singletonList("foo"));
+                    controller.findTopicIds(context0, Collections.singletonList("foo"));
                 CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNamesFuture =
-                    controller.findTopicNames(now, Collections.singletonList(Uuid.ZERO_UUID));
+                    controller.findTopicNames(context0, Collections.singletonList(Uuid.ZERO_UUID));
                 CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
-                    controller.createPartitions(now, Collections.singletonList(
+                    controller.createPartitions(context0, Collections.singletonList(
                         new CreatePartitionsTopic()));
                 CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
-                    controller.electLeaders(new ElectLeadersRequestData().setTimeoutMs(0).
+                    controller.electLeaders(context0, new ElectLeadersRequestData().setTimeoutMs(0).
                         setTopicPartitions(null));
                 CompletableFuture<AlterPartitionReassignmentsResponseData> alterReassignmentsFuture =
-                    controller.alterPartitionReassignments(
+                    controller.alterPartitionReassignments(context0,
                         new AlterPartitionReassignmentsRequestData().setTimeoutMs(0).
                             setTopics(Collections.singletonList(new ReassignableTopic())));
                 CompletableFuture<ListPartitionReassignmentsResponseData> listReassignmentsFuture =
-                    controller.listPartitionReassignments(
+                    controller.listPartitionReassignments(context0,
                         new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0));
                 while (controller.time().nanoseconds() == now) {
                     Thread.sleep(0, 10);
@@ -840,22 +853,21 @@ public class QuorumControllerTest {
                 QuorumController controller = controlEnv.activeController();
                 CountDownLatch countDownLatch = controller.pause();
                 CompletableFuture<CreateTopicsResponseData> createFuture =
-                    controller.createTopics(new CreateTopicsRequestData().setTimeoutMs(120000),
-                        Collections.emptySet());
-                long deadlineMs = controller.time().nanoseconds() + HOURS.toNanos(1);
+                    controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
+                        setTimeoutMs(120000), Collections.emptySet());
                 CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
-                    controller.deleteTopics(deadlineMs, Collections.emptyList());
+                    controller.deleteTopics(ANONYMOUS_CONTEXT, Collections.emptyList());
                 CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIdsFuture =
-                    controller.findTopicIds(deadlineMs, Collections.emptyList());
+                    controller.findTopicIds(ANONYMOUS_CONTEXT, Collections.emptyList());
                 CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNamesFuture =
-                    controller.findTopicNames(deadlineMs, Collections.emptyList());
+                    controller.findTopicNames(ANONYMOUS_CONTEXT, Collections.emptyList());
                 CompletableFuture<List<CreatePartitionsTopicResult>> createPartitionsFuture =
-                    controller.createPartitions(deadlineMs, Collections.emptyList());
+                    controller.createPartitions(ANONYMOUS_CONTEXT, Collections.emptyList());
                 CompletableFuture<ElectLeadersResponseData> electLeadersFuture =
-                    controller.electLeaders(new ElectLeadersRequestData().setTimeoutMs(120000));
+                    controller.electLeaders(ANONYMOUS_CONTEXT, new ElectLeadersRequestData());
                 CompletableFuture<AlterPartitionReassignmentsResponseData> alterReassignmentsFuture =
-                    controller.alterPartitionReassignments(
-                        new AlterPartitionReassignmentsRequestData().setTimeoutMs(12000));
+                    controller.alterPartitionReassignments(ANONYMOUS_CONTEXT,
+                        new AlterPartitionReassignmentsRequestData());
                 createFuture.get();
                 deleteFuture.get();
                 findTopicIdsFuture.get();
@@ -891,7 +903,7 @@ public class QuorumControllerTest {
                 )
                 .collect(Collectors.toList());
 
-            Uuid topicId = controller.createTopics(new CreateTopicsRequestData()
+            Uuid topicId = controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData()
                     .setTopics(new CreatableTopicCollection(Collections.singleton(new CreatableTopic()
                         .setName(topicName)
                         .setNumPartitions(-1)
@@ -930,10 +942,8 @@ public class QuorumControllerTest {
             logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
 
             int oldClaimEpoch = controller.curClaimEpoch();
-            assertThrows(
-                ExecutionException.class,
-                () -> controller.alterPartition(alterPartitionRequest).get()
-            );
+            assertThrows(ExecutionException.class,
+                () -> controller.alterPartition(ANONYMOUS_CONTEXT, alterPartitionRequest).get());
 
             // Wait for the controller to become active again
             assertSame(controller, controlEnv.activeController());
@@ -973,7 +983,7 @@ public class QuorumControllerTest {
     private Map<Integer, Long> registerBrokers(QuorumController controller, int numBrokers) throws Exception {
         Map<Integer, Long> brokerEpochs = new HashMap<>();
         for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
-            BrokerRegistrationReply reply = controller.registerBroker(
+            BrokerRegistrationReply reply = controller.registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData()
                     .setBrokerId(brokerId)
                     .setRack(null)
@@ -993,7 +1003,7 @@ public class QuorumControllerTest {
             brokerEpochs.put(brokerId, reply.epoch());
 
             // Send heartbeat to unfence
-            controller.processBrokerHeartbeat(
+            controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
                 new BrokerHeartbeatRequestData()
                     .setWantFence(false)
                     .setBrokerEpoch(brokerEpochs.get(brokerId))
@@ -1014,7 +1024,7 @@ public class QuorumControllerTest {
             return;
         }
         for (Integer brokerId : brokers) {
-            BrokerHeartbeatReply reply = controller.processBrokerHeartbeat(
+            BrokerHeartbeatReply reply = controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
                 new BrokerHeartbeatRequestData()
                     .setWantFence(false)
                     .setBrokerEpoch(brokerEpochs.get(brokerId))
@@ -1033,7 +1043,7 @@ public class QuorumControllerTest {
             })) {
                 QuorumController active = controlEnv.activeController();
                 registerBrokers(active, 5);
-                active.createTopics(new CreateTopicsRequestData().
+                active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().
                     setTopics(new CreatableTopicCollection(Collections.singleton(
                         new CreatableTopic().setName("foo").
                             setReplicationFactor((short) 3).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index d095a9fe37..0bfb6da7a2 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -68,9 +68,11 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.LeaderRecoveryState;
+import org.apache.kafka.metadata.MockRandom;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -135,14 +137,14 @@ public class ReplicationControlManagerTest {
         final MockTime time = new MockTime();
         final MockRandom random = new MockRandom();
         final ControllerMetrics metrics = new MockControllerMetrics();
-        final String clusterId = Uuid.randomUuid().toString();
-        final ClusterControlManager clusterControl = new ClusterControlManager(logContext,
-            clusterId,
-            time,
-            snapshotRegistry,
-            TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
-            new StripedReplicaPlacer(random),
-            metrics);
+        final ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+            setLogContext(logContext).
+            setTime(time).
+            setSnapshotRegistry(snapshotRegistry).
+            setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS)).
+            setReplicaPlacer(new StripedReplicaPlacer(random)).
+            setControllerMetrics(metrics).
+            build();
         final ConfigurationControlManager configurationControl = new ConfigurationControlManager.Builder().
             setSnapshotRegistry(snapshotRegistry).
             build();
@@ -159,16 +161,15 @@ public class ReplicationControlManagerTest {
         }
 
         ReplicationControlTestContext(Optional<CreateTopicPolicy> createTopicPolicy) {
-            this.replicationControl = new ReplicationControlManager(snapshotRegistry,
-                new LogContext(),
-                (short) 3,
-                1,
-                Integer.MAX_VALUE,
-                true,
-                configurationControl,
-                clusterControl,
-                metrics,
-                createTopicPolicy);
+            this.replicationControl = new ReplicationControlManager.Builder().
+                setSnapshotRegistry(snapshotRegistry).
+                setLogContext(logContext).
+                setMaxElectionsPerImbalance(Integer.MAX_VALUE).
+                setConfigurationControl(configurationControl).
+                setClusterControl(clusterControl).
+                setControllerMetrics(metrics).
+                setCreateTopicPolicy(createTopicPolicy).
+                build();
             clusterControl.activate();
         }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockRandom.java b/metadata/src/test/java/org/apache/kafka/metadata/MockRandom.java
similarity index 96%
rename from metadata/src/test/java/org/apache/kafka/controller/MockRandom.java
rename to metadata/src/test/java/org/apache/kafka/metadata/MockRandom.java
index c42a158b66..40b2f13463 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockRandom.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/MockRandom.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.controller;
+package org.apache.kafka.metadata;
 
 import java.util.Random;
 
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
index 2f0fb0f6a5..19b07abed2 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizerTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.controller.ControllerRequestContext;
 import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult;
@@ -69,7 +70,10 @@ public class ClusterMetadataAuthorizerTest {
         }
 
         @Override
-        public CompletableFuture<List<AclCreateResult>> createAcls(List<AclBinding> aclBindings) {
+        public CompletableFuture<List<AclCreateResult>> createAcls(
+            ControllerRequestContext context,
+            List<AclBinding> aclBindings
+        ) {
             return createAclsResponse;
         }
 
@@ -78,7 +82,10 @@ public class ClusterMetadataAuthorizerTest {
         }
 
         @Override
-        public CompletableFuture<List<AclDeleteResult>> deleteAcls(List<AclBindingFilter> aclBindingFilters) {
+        public CompletableFuture<List<AclDeleteResult>> deleteAcls(
+            ControllerRequestContext context,
+            List<AclBindingFilter> aclBindingFilters
+        ) {
             return deleteAclsResponse;
         }
     }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
similarity index 89%
rename from metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
rename to metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
index c3fbb0996a..9450d40370 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java
@@ -15,18 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.controller;
+package org.apache.kafka.metadata.placement;
 
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
-import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList;
-import org.apache.kafka.controller.StripedReplicaPlacer.RackList;
-import org.apache.kafka.metadata.UsableBroker;
+import org.apache.kafka.metadata.MockRandom;
+import org.apache.kafka.metadata.placement.StripedReplicaPlacer.BrokerList;
+import org.apache.kafka.metadata.placement.StripedReplicaPlacer.RackList;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -84,6 +85,25 @@ public class StripedReplicaPlacerTest {
         assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
     }
 
+    private List<List<Integer>> place(
+        ReplicaPlacer placer,
+        int startPartition,
+        int numPartitions,
+        short replicationFactor,
+        List<UsableBroker> brokers
+    ) {
+        PlacementSpec placementSpec = new PlacementSpec(startPartition,
+            numPartitions,
+            replicationFactor);
+        ClusterDescriber cluster = new ClusterDescriber() {
+            @Override
+            public Iterator<UsableBroker> usableBrokers() {
+                return brokers.iterator();
+            }
+        };
+        return placer.place(placementSpec, cluster);
+    }
+
     /**
      * Test that we perform striped replica placement as expected for a multi-partition topic
      * on a single unfenced broker
@@ -95,9 +115,9 @@ public class StripedReplicaPlacerTest {
         assertEquals(Arrays.asList(Arrays.asList(0),
                 Arrays.asList(0),
                 Arrays.asList(0)),
-                placer.place(0, 3, (short) 1, Arrays.asList(
+                place(placer, 0, 3, (short) 1, Arrays.asList(
                         new UsableBroker(0, Optional.empty(), false),
-                        new UsableBroker(1, Optional.empty(), true)).iterator()));
+                        new UsableBroker(1, Optional.empty(), true))));
     }
 
     /**
@@ -166,9 +186,9 @@ public class StripedReplicaPlacerTest {
         StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
         assertEquals("All brokers are currently fenced.",
             assertThrows(InvalidReplicationFactorException.class,
-                () -> placer.place(0, 1, (short) 1, Arrays.asList(
+                () -> place(placer, 0, 1, (short) 1, Arrays.asList(
                     new UsableBroker(11, Optional.of("1"), true),
-                    new UsableBroker(10, Optional.of("1"), true)).iterator())).getMessage());
+                    new UsableBroker(10, Optional.of("1"), true)))).getMessage());
     }
 
     @Test
@@ -178,9 +198,9 @@ public class StripedReplicaPlacerTest {
         assertEquals("The target replication factor of 3 cannot be reached because only " +
             "2 broker(s) are registered.",
             assertThrows(InvalidReplicationFactorException.class,
-                () -> placer.place(0, 1, (short) 3, Arrays.asList(
+                () -> place(placer, 0, 1, (short) 3, Arrays.asList(
                     new UsableBroker(11, Optional.of("1"), false),
-                    new UsableBroker(10, Optional.of("1"), false)).iterator())).getMessage());
+                    new UsableBroker(10, Optional.of("1"), false)))).getMessage());
     }
 
     @Test
@@ -189,9 +209,9 @@ public class StripedReplicaPlacerTest {
         StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
         assertEquals("Invalid replication factor 0: the replication factor must be positive.",
                 assertThrows(InvalidReplicationFactorException.class,
-                        () -> placer.place(0, 1, (short) 0, Arrays.asList(
+                        () -> place(placer, 0, 1, (short) 0, Arrays.asList(
                                 new UsableBroker(11, Optional.of("1"), false),
-                                new UsableBroker(10, Optional.of("1"), false)).iterator())).getMessage());
+                                new UsableBroker(10, Optional.of("1"), false)))).getMessage());
     }
 
     @Test
@@ -203,22 +223,22 @@ public class StripedReplicaPlacerTest {
                 Arrays.asList(0, 1, 2),
                 Arrays.asList(1, 2, 3),
                 Arrays.asList(1, 0, 2)),
-            placer.place(0, 5, (short) 3, Arrays.asList(
+            place(placer, 0, 5, (short) 3, Arrays.asList(
                 new UsableBroker(0, Optional.empty(), false),
                 new UsableBroker(3, Optional.empty(), false),
                 new UsableBroker(2, Optional.empty(), false),
-                new UsableBroker(1, Optional.empty(), false)).iterator()));
+                new UsableBroker(1, Optional.empty(), false))));
     }
 
     @Test
     public void testEvenDistribution() {
         MockRandom random = new MockRandom();
         StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
-        List<List<Integer>> replicas = placer.place(0, 200, (short) 2, Arrays.asList(
+        List<List<Integer>> replicas = place(placer, 0, 200, (short) 2, Arrays.asList(
             new UsableBroker(0, Optional.empty(), false),
             new UsableBroker(1, Optional.empty(), false),
             new UsableBroker(2, Optional.empty(), false),
-            new UsableBroker(3, Optional.empty(), false)).iterator());
+            new UsableBroker(3, Optional.empty(), false)));
         Map<List<Integer>, Integer> counts = new HashMap<>();
         for (List<Integer> partitionReplicas : replicas) {
             counts.put(partitionReplicas, counts.getOrDefault(partitionReplicas, 0) + 1);