You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/23 03:24:12 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493160927



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+    public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                                                    final boolean validateOnly) {
+        return generateIncrementalRequestData(configs.keySet(), configs, validateOnly);
+    }
+
+    public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Collection<ConfigResource> resources,

Review comment:
       nit: might be useful to document the expectation that `resources` is a subset of the key set of `configs`. The signature surprised me a little bit.
   
   As an aside, this kind of convenience conversion seems more appropriate for `IncrementalAlterConfigsRequest.Builder` rather than a static class.

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -103,6 +103,9 @@ object ApiVersion {
     KAFKA_2_7_IV0,
     // Bup Fetch protocol for Raft protocol (KIP-595)
     KAFKA_2_7_IV1,
+    // Enable redirection (KIP-590)
+    // TODO: remove this IBP in the 2.7 release if redirection work could not be done before the freeze

Review comment:
       Get rid of this TODO. We do not need to remove IBP internal versions.

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -147,7 +147,9 @@ abstract class InterBrokerSendThread(name: String,
 
 case class RequestAndCompletionHandler(destination: Node,
                                        request: AbstractRequest.Builder[_ <: AbstractRequest],
-                                       handler: RequestCompletionHandler)
+                                       handler: RequestCompletionHandler,
+                                       initialPrincipalName: String = null,

Review comment:
       nit: why don't we add a case class and make this optional. for example:
   
   ```scala
   case class InitialPrincipal(name: String, clientId: String)
   ```
   In addition to reducing parameters, that makes the expectation that both are provided explicit.
   

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),

Review comment:
       In general, the forwarded request may have a different version than the client request. I'm wondering if we should keep the version the same in case there are semantic differences. As an example, a newer version of the API may introduce unexpected error codes. Unless we have logic to convert those error codes, then we might break compatibility unexpectedly.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -459,7 +459,10 @@ class AclAuthorizer extends Authorizer with Logging {
       val apiKey = if (ApiKeys.hasId(requestContext.requestType)) ApiKeys.forId(requestContext.requestType).name else requestContext.requestType
       val refCount = action.resourceReferenceCount
 
-      s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount"
+      val initialPrincipalName = requestContext.initialPrincipalName
+      val initialPrincipalMessage = if(initialPrincipalName != null) s", on behalf of initial principal =$initialPrincipalName," else ""

Review comment:
       nit: space after `if`

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -547,7 +553,8 @@ class AdminManager(val config: KafkaConfig,
       None
     else {
       val id = resourceNameToBrokerId(resource.name)
-      if (id != this.config.brokerId)
+      // Under redirection, it is possible to handle config changes targeting at brokers other than the controller.

Review comment:
       The comment doesn't seem to make sense here. Seems like the logic doesn't have anything to do with the controller?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {

Review comment:
       nit: this is misaligned

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -117,10 +119,26 @@ class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache
                                   callback: RequestCompletionHandler): Unit = {
     requestQueue.put(BrokerToControllerQueueItem(request, callback))
   }
+
+  private[server] def forwardRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest],
+                                     responseToOriginalClient: (RequestChannel.Request, Int => AbstractResponse,

Review comment:
       This function has 3 callbacks... It would be nice if we could figure out how to pass through the `ForwardRequestHandler` directly.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3064,12 +3272,33 @@ class KafkaApis(val requestChannel: RequestChannel,
                                 logIfDenied: Boolean = true,
                                 refCount: Int = 1): Boolean = {
     authorizer.forall { authZ =>
-      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
-      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
-      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+      if (authorizeAction(requestContext, operation,
+        resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)) {
+        true
+      } else {
+        operation match {
+          case ALTER | ALTER_CONFIGS | CREATE | DELETE =>

Review comment:
       It would be helpful to have a comment explaining this. It does not seem obvious.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -273,31 +275,632 @@ class KafkaApisTest {
       .setIncludeSynonyms(true)
       .setResources(List(new DescribeConfigsRequestData.DescribeConfigsResource()
         .setResourceName("topic-1")
-        .setResourceType(ConfigResource.Type.TOPIC.id)).asJava)).build(requestHeader.apiVersion))
+        .setResourceType(ConfigResource.Type.TOPIC.id)).asJava))
+      .build(requestHeader.apiVersion),
+      requestHeader = Option(requestHeader))
     createKafkaApis(authorizer = Some(authorizer)).handleDescribeConfigsRequest(request)
 
     verify(authorizer, adminManager)
   }
 
+  @Test
+  def testAlterClientQuotasWithAuthorizer(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val request = buildRequest(new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    expectNoThrottling()
+
+    EasyMock.expect(adminManager.alterClientQuotas(anyObject(), EasyMock.eq(false)))
+      .andReturn(Map(quotaEntity -> ApiError.NONE))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasWithNonControllerAndRedirectionDisabled(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest)
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.expect(adminManager.alterClientQuotas(anyObject(), EasyMock.eq(false)))
+      .andReturn(Map(quotaEntity -> ApiError.NONE))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    // Should just handle the config change since IBP is low
+    createKafkaApis(interBrokerProtocolVersion = KAFKA_2_6_IV0,
+      authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map(quotaEntity -> Errors.NONE))
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasWithRedirection(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val request = buildRequest(new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    expectNoThrottling()
+
+    val redirectRequestBuilder = new AlterClientQuotasRequest.Builder(
+      Set(new ClientQuotaAlteration(quotaEntity, Collections.emptySet())).asJava, false)
+
+    val capturedCallback = EasyMock.newCapture[ClientResponse => AbstractResponse]()
+
+    EasyMock.expect(redirectionManager.forwardRequest(
+      EasyMock.eq(redirectRequestBuilder),
+      anyObject[(RequestChannel.Request, Int => AbstractResponse,
+        Option[Send => Unit]) => Unit](),
+      EasyMock.eq(request),
+      EasyMock.capture(capturedCallback),
+      anyObject()
+    )).once()
+
+    val clientResponse: ClientResponse = EasyMock.createNiceMock(classOf[ClientResponse])
+    val alterClientQuotasResponse = new AlterClientQuotasResponse(
+      Map(quotaEntity -> ApiError.NONE).asJava, 10
+    )
+    EasyMock.expect(clientResponse.responseBody).andReturn(alterClientQuotasResponse)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel,
+      authorizer, controller, redirectionManager, clientResponse)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    assertEquals(alterClientQuotasResponse, capturedCallback.getValue.apply(clientResponse))
+
+    EasyMock.verify(controller, redirectionManager)
+  }
+
+  @Test
+  def testAlterClientQuotasAsForwardingRequestWithNonController(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    // Include extra header fields for forwarding request check
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0, "initial-principal", "initial-client")
+
+    val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest,
+      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map(quotaEntity -> Errors.NOT_CONTROLLER))
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasAsForwardingRequest(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
+    // As a forwarding request, we would use CLUSTER_ACTION to do a separate round of auth.
+    authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    // Include extra header fields for forwarding request check
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0, "initial-principal", "initial-client")
+
+    val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest,
+      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map( quotaEntity -> Errors.BROKER_AUTHORIZATION_FAILURE))
+
+    verify(authorizer, adminManager)
+  }
+
+  private def verifyAlterClientQuotaResult(alterClientQuotasRequest: AlterClientQuotasRequest,
+                                           capturedResponse: Capture[RequestChannel.Response],
+                                           expected: Map[ClientQuotaEntity, Errors]): Unit = {
+    val response = readResponse(ApiKeys.ALTER_CLIENT_QUOTAS, alterClientQuotasRequest, capturedResponse)
+      .asInstanceOf[AlterClientQuotasResponse]
+    val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap
+    response.complete(futures.asJava)
+    futures.foreach {
+      case (entity, future) =>
+        future.whenComplete((_, thrown) =>
+          assertEquals(thrown, expected(entity).exception())
+        ).isDone
+    }
+  }
+
+  @Test
+  def testCreateTopicsWithAuthorizer(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    val operation = AclOperation.CREATE
+    val topicName = "topic-1"
+    val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, ApiKeys.CREATE_TOPICS.latestVersion,
+      clientId, 0)
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    authorizeResource(authorizer, operation, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED, logIfDenied = false)
+
+    authorizeResource(authorizer, AclOperation.DESCRIBE_CONFIGS, ResourceType.TOPIC,
+      topicName, AuthorizationResult.ALLOWED, logIfDenied = false)
+
+    expectNoThrottling()
+
+    val topicsAuthorized = new CreateTopicsRequestData.CreatableTopicCollection(1)
+    val topicToCreate = new CreateTopicsRequestData.CreatableTopic()
+      .setName(topicName)
+    topicsAuthorized.add(topicToCreate)
+
+    val timeout = 10
+    val request = buildRequest(new CreateTopicsRequest.Builder(new CreateTopicsRequestData()
+      .setTimeoutMs(timeout)
+      .setValidateOnly(false)
+      .setTopics(topicsAuthorized))
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.eq(request), EasyMock.eq(6))).andReturn(UnboundedControllerMutationQuota)
+
+    EasyMock.expect(adminManager.createTopics(
+      EasyMock.eq(timeout),
+      EasyMock.eq(false),
+      EasyMock.eq(Map(topicName -> topicToCreate)),
+      anyObject(),
+      EasyMock.eq(UnboundedControllerMutationQuota),
+      anyObject()))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, authorizer, adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleCreateTopicsRequest(request)
+
+    verify(authorizer, adminManager, clientControllerQuotaManager)
+  }
+
+  @Test
+  def testCreateTopicsWithNonControllerAndRedirectionDisabled(): Unit = {

Review comment:
       Good to see the unit tests in here. I think we also need at least a couple integration tests. For example, could we add something to `CreateTopicsRequestTest` to ensure that forwarding works as expected?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1733,68 +1817,109 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None)
     }
 
-    val createTopicsRequest = request.body[CreateTopicsRequest]
-    val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
-      }
-      val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
-      val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
+    val forwardRequestHandler = new ForwardRequestHandler[CreateTopicsRequest,
+      CreateTopicsResponse, String, CreatableTopic](request) {
 
-      results.forEach { topic =>
-        if (results.findAll(topic.name).size > 1) {
-          topic.setErrorCode(Errors.INVALID_REQUEST.code)
-          topic.setErrorMessage("Found multiple entries for this topic.")
-        } else if (!authorizedTopics.contains(topic.name)) {
-          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-          topic.setErrorMessage("Authorization failed.")
-        }
-        if (!authorizedForDescribeConfigs.contains(topic.name)) {
-          topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+      override def resourceSplitByAuthorization(createTopicsRequest: CreateTopicsRequest):
+      (Map[String, CreatableTopic], Map[String, ApiError]) = {

Review comment:
       nit: this is subjective, but this style is a bit ugly. I would prefer the following:
   ```scala
   override def resourceSplitByAuthorization(
     createTopicsRequest: CreateTopicsRequest
   ): (Map[String, CreatableTopic], Map[String, ApiError]) = {
   ```
   That makes it easier visually to separate the return type and the function logic (again, in my opinion).

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),
+          sendResponseMaybeThrottle,
+          request,
+          response => {
+            mergeResponse(response.responseBody.asInstanceOf[R], unauthorizedResources)
+          })
+      } else {
+        // When IBP is smaller than 2.7, forwarding is not supported,
+        // therefore requests are handled directly
+        process(authorizedResources, unauthorizedResources, requestBody)

Review comment:
       We can't guarantee that this broker will still be the controller when we call `process` or that the broker we're forwarding to will still be the controller when it receives the request. In these cases, we need to return some retriable error to the client. Can you help me understand how this is implemented?

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -513,15 +513,21 @@ class AdminManager(val config: KafkaConfig,
     resource -> ApiError.NONE
   }
 
-  private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean,
-                                 configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
+  private def alterBrokerConfigs(resource: ConfigResource,
+                                 validateOnly: Boolean,
+                                 configProps: Properties,
+                                 configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
     val brokerId = getBrokerId(resource)
     val perBrokerConfig = brokerId.nonEmpty
     this.config.dynamicConfig.validate(configProps, perBrokerConfig)
     validateConfigPolicy(resource, configEntriesMap)
     if (!validateOnly) {
-      if (perBrokerConfig)
+      if (perBrokerConfig) {
+        val previousConfigProps = config.dynamicConfig.currentDynamicBrokerConfigs
         this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
+        this.config.dynamicConfig.maybeAugmentSSLStorePaths(configProps, previousConfigProps)

Review comment:
       Can you explain why this change is needed?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {

Review comment:
       nit: seems `handle` doesn't really need to be part of `ForwardRequestHandler`. Instead we could pull it out:
   ```scala
   private def handle(handler: ForwardRequestHandler): Unit = {
   ...
   ```
   The advantage of this is that it allows us to pull the type out of `KafkaApis` without inheriting all of the dependencies that are needed by `handle`.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
##########
@@ -25,23 +25,35 @@
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class IncrementalAlterConfigsResponse extends AbstractResponse {
 
-    public static IncrementalAlterConfigsResponseData toResponseData(final int requestThrottleMs,
-                                                                     final Map<ConfigResource, ApiError> results) {
-        IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData();
-        responseData.setThrottleTimeMs(requestThrottleMs);
-        for (Map.Entry<ConfigResource, ApiError> entry : results.entrySet()) {
-            responseData.responses().add(new AlterConfigsResourceResponse().
-                    setResourceName(entry.getKey().name()).
-                    setResourceType(entry.getKey().type().id()).
-                    setErrorCode(entry.getValue().error().code()).
-                    setErrorMessage(entry.getValue().message()));
-        }
-        return responseData;
+    public IncrementalAlterConfigsResponse(final int requestThrottleMs,
+                                           final Map<ConfigResource, ApiError> results) {
+        this.data = new IncrementalAlterConfigsResponseData()
+                        .setThrottleTimeMs(requestThrottleMs);
+
+        addResults(results);
+    }
+
+    public IncrementalAlterConfigsResponse addResults(final Map<ConfigResource, ApiError> results) {

Review comment:
       Typically responses are immutable after construction. It seems kind of a brittle pattern to rely on being able to mutate the response we receive from the other broker. For example we inherit the throttle time which is a bit weird. Are we saving that much by not creating a new response?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org