You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/30 07:34:38 UTC

[GitHub] [kafka] abbccdda opened a new pull request #9103: Add redirection for (Incremental)AlterConfig

abbccdda opened a new pull request #9103:
URL: https://github.com/apache/kafka/pull/9103


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493206068



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+    public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                                                    final boolean validateOnly) {
+        return generateIncrementalRequestData(configs.keySet(), configs, validateOnly);
+    }
+
+    public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Collection<ConfigResource> resources,

Review comment:
       The primary reason is that we would trigger the disallowed import if we do it in the request builder:
   ```
   [ant:checkstyle] [ERROR] /Users/boyang.chen/code/kafka/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java:20:1: Disallowed import - org.apache.kafka.clients.admin.AlterConfigOp. [ImportControl]
   ```
   Let me check if we could make exceptions here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493771267



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + configName
+              if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) &&
+                configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) {
+                val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//")
+                configProps.setProperty(prefixedName, equivalentFileName)
+                processedFiles.add(prefixedName)
+              }
+            })
+        })
+  }
+
+  private[server] def trimSSLStorePaths(configProps: Properties): Boolean = {
+    var fileChanged = false
+    val processedFiles = new mutable.HashSet[String]
+
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+      .foreach {
+        case reconfigurable: ListenerReconfigurable =>
+        ReloadableFileConfigs.foreach(configName => {
+          val prefixedName = reconfigurable.listenerName.configPrefix + configName
+          if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName)) {
+            val configFileName = configProps.getProperty(prefixedName)
+            val equivalentFileName = configFileName.replace("//", "/")
+            if (!configFileName.equals(equivalentFileName)) {
+              fileChanged = true

Review comment:
       Yea




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493773324



##########
File path: core/src/main/scala/kafka/server/ConfigHandler.scala
##########
@@ -203,7 +203,13 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
     if (brokerId == ConfigEntityName.Default)
       brokerConfig.dynamicConfig.updateDefaultConfig(properties)
     else if (brokerConfig.brokerId == brokerId.trim.toInt) {
-      brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
+      val persistentProps = brokerConfig.dynamicConfig.fromPersistentProps(properties, perBrokerConfig = true)
+      // The filepath was changed for equivalent replacement, which means we should reload
+      if (brokerConfig.dynamicConfig.trimSSLStorePaths(persistentProps)) {
+        brokerConfig.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(persistentProps)
+      }

Review comment:
       I feel it's more explicit to do it in here, as zk notification is the only target case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510552124



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),

Review comment:
       Quotas are one aspect of this work that need more consideration. What we don't want is for the inter-broker channel to get affected by the individual client throttle, which is what will happen with the current patch. What I'd suggest for now is that we allow the broker to track client quotas and pass back the throttle value in the underlying response, but we set the envelope throttle time to 0 and ensure that the inter-broker channel does not get throttled. 
   
   For this, I think we we will need to change the logic in `KafkaApis.sendResponseMaybeThrottle`. If it is a forwarded request, we still need to check `maybeRecordAndGetThrottleTimeMs`, but we can skip the call to `ClientQuotaManager.throttle`. When the response is received on the forwarding broker, we will need to apply the throttle, which I think the patch already handles.
   
   One challenging aspect is how this will affect quota metrics. Currently quota/throttling metrics are relatively simple because they are recorded separately by each broker. However, here the controller is the one that is tracking the throttling for the client across multiple inbound connections from multiple brokers. This means that the broker that is applying a throttle for a forwarded request may not have actually observed a quota violation. Other than causing some reporting confusion, I am not sure whether there are any other consequences to this.
   
   cc @apovzner @rajinisivaram 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r516933962



##########
File path: core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
##########
@@ -54,8 +55,15 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
 
   private def createTestTopicAndCluster(topicPartition: Map[TopicPartition, List[Int]],
                                         authorizer: Option[String] = None): Unit = {
+    val brokerConfigs = (0 until 3).map { node =>

Review comment:
       Do we need this change anymore?

##########
File path: core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
##########
@@ -104,7 +123,7 @@ class ApiVersionTest {
       apiVersion.id
     })
 
-    val uniqueIds: Set[Int] = allIds.toSet
+    val uniqueIds: Predef.Set[Int] = allIds.toSet

Review comment:
       nit: is this change needed?

##########
File path: core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
##########
@@ -360,6 +360,8 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
 
     // Test that we cannot create or delete ACLs when ALTER is denied.
     authorizationAdmin.addClusterAcl(DENY, ALTER)
+    // CLUSTER_ACTION shall also be forbid since the request goes to privilege listener
+    authorizationAdmin.addClusterAcl(DENY, CLUSTER_ACTION)

Review comment:
       Is this change needed? I am not sure I follow the comment about the privileged listener. That shouldn't affect ACLs I think.

##########
File path: core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationWithForwardingIntegrationTest.scala
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.Properties
+
+import org.junit.Test
+
+/**
+ * Integration test suite for forwarding mechanism applied on AlterConfigs.
+ * This class basically reused everything from {@link DynamicBrokerReconfigurationTest}
+ * with the KIP-500 mode enabled for sasl listener alter test.
+ */
+class DynamicBrokerReconfigurationWithForwardingIntegrationTest extends DynamicBrokerReconfigurationTest {

Review comment:
       This inherits all tests from `DynamicBrokerReconfigurationTest`, which doesn't look to be intended. Can we just remove it? We can add it back once we get to testing the ssl path changes. For now I think the simple integration test for CreateTopics is good enough.
   
   (By the way, it's curious that `testTrustStoreAlter` still passes even after we have removed the path update logic.)

##########
File path: core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
##########
@@ -45,6 +45,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
   override def modifyConfigs(props: Seq[Properties]): Unit = {
     props.foreach { p =>
       p.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+      p.put(KafkaConfig.EnableMetadataQuorumProp, "true")

Review comment:
       I don't think we want to make this the default until we are ready to enable it. I would suggest we create a new `ForwardRequestTest` which extends `BaseRequestTest`. Then we can move the test case from `CreateTopicsRequestTest`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-713007022


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-718051602


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493782410



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
##########
@@ -25,23 +25,35 @@
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class IncrementalAlterConfigsResponse extends AbstractResponse {
 
-    public static IncrementalAlterConfigsResponseData toResponseData(final int requestThrottleMs,
-                                                                     final Map<ConfigResource, ApiError> results) {
-        IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData();
-        responseData.setThrottleTimeMs(requestThrottleMs);
-        for (Map.Entry<ConfigResource, ApiError> entry : results.entrySet()) {
-            responseData.responses().add(new AlterConfigsResourceResponse().
-                    setResourceName(entry.getKey().name()).
-                    setResourceType(entry.getKey().type().id()).
-                    setErrorCode(entry.getValue().error().code()).
-                    setErrorMessage(entry.getValue().message()));
-        }
-        return responseData;
+    public IncrementalAlterConfigsResponse(final int requestThrottleMs,
+                                           final Map<ConfigResource, ApiError> results) {
+        this.data = new IncrementalAlterConfigsResponseData()
+                        .setThrottleTimeMs(requestThrottleMs);
+
+        addResults(results);
+    }
+
+    public IncrementalAlterConfigsResponse addResults(final Map<ConfigResource, ApiError> results) {

Review comment:
       I guess we could get rid of it and do the merge in caller level.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r513757978



##########
File path: clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
##########
@@ -59,7 +59,8 @@ public void testResponseThrottleTime() {
         for (ApiKeys apiKey: ApiKeys.values()) {
             Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion());
             BoundField throttleTimeField = responseSchema.get(CommonFields.THROTTLE_TIME_MS.name);
-            if (apiKey.clusterAction || authenticationKeys.contains(apiKey))
+            // Envelope could be throttled, even though it requires cluster action.
+            if (apiKey != ApiKeys.ENVELOPE && (apiKey.clusterAction || authenticationKeys.contains(apiKey)))

Review comment:
       Sounds good, will remove the throttle time field from the Envelope
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493160927



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+    public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                                                    final boolean validateOnly) {
+        return generateIncrementalRequestData(configs.keySet(), configs, validateOnly);
+    }
+
+    public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Collection<ConfigResource> resources,

Review comment:
       nit: might be useful to document the expectation that `resources` is a subset of the key set of `configs`. The signature surprised me a little bit.
   
   As an aside, this kind of convenience conversion seems more appropriate for `IncrementalAlterConfigsRequest.Builder` rather than a static class.

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -103,6 +103,9 @@ object ApiVersion {
     KAFKA_2_7_IV0,
     // Bup Fetch protocol for Raft protocol (KIP-595)
     KAFKA_2_7_IV1,
+    // Enable redirection (KIP-590)
+    // TODO: remove this IBP in the 2.7 release if redirection work could not be done before the freeze

Review comment:
       Get rid of this TODO. We do not need to remove IBP internal versions.

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -147,7 +147,9 @@ abstract class InterBrokerSendThread(name: String,
 
 case class RequestAndCompletionHandler(destination: Node,
                                        request: AbstractRequest.Builder[_ <: AbstractRequest],
-                                       handler: RequestCompletionHandler)
+                                       handler: RequestCompletionHandler,
+                                       initialPrincipalName: String = null,

Review comment:
       nit: why don't we add a case class and make this optional. for example:
   
   ```scala
   case class InitialPrincipal(name: String, clientId: String)
   ```
   In addition to reducing parameters, that makes the expectation that both are provided explicit.
   

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),

Review comment:
       In general, the forwarded request may have a different version than the client request. I'm wondering if we should keep the version the same in case there are semantic differences. As an example, a newer version of the API may introduce unexpected error codes. Unless we have logic to convert those error codes, then we might break compatibility unexpectedly.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -459,7 +459,10 @@ class AclAuthorizer extends Authorizer with Logging {
       val apiKey = if (ApiKeys.hasId(requestContext.requestType)) ApiKeys.forId(requestContext.requestType).name else requestContext.requestType
       val refCount = action.resourceReferenceCount
 
-      s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount"
+      val initialPrincipalName = requestContext.initialPrincipalName
+      val initialPrincipalMessage = if(initialPrincipalName != null) s", on behalf of initial principal =$initialPrincipalName," else ""

Review comment:
       nit: space after `if`

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -547,7 +553,8 @@ class AdminManager(val config: KafkaConfig,
       None
     else {
       val id = resourceNameToBrokerId(resource.name)
-      if (id != this.config.brokerId)
+      // Under redirection, it is possible to handle config changes targeting at brokers other than the controller.

Review comment:
       The comment doesn't seem to make sense here. Seems like the logic doesn't have anything to do with the controller?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {

Review comment:
       nit: this is misaligned

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -117,10 +119,26 @@ class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache
                                   callback: RequestCompletionHandler): Unit = {
     requestQueue.put(BrokerToControllerQueueItem(request, callback))
   }
+
+  private[server] def forwardRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest],
+                                     responseToOriginalClient: (RequestChannel.Request, Int => AbstractResponse,

Review comment:
       This function has 3 callbacks... It would be nice if we could figure out how to pass through the `ForwardRequestHandler` directly.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3064,12 +3272,33 @@ class KafkaApis(val requestChannel: RequestChannel,
                                 logIfDenied: Boolean = true,
                                 refCount: Int = 1): Boolean = {
     authorizer.forall { authZ =>
-      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
-      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
-      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+      if (authorizeAction(requestContext, operation,
+        resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)) {
+        true
+      } else {
+        operation match {
+          case ALTER | ALTER_CONFIGS | CREATE | DELETE =>

Review comment:
       It would be helpful to have a comment explaining this. It does not seem obvious.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -273,31 +275,632 @@ class KafkaApisTest {
       .setIncludeSynonyms(true)
       .setResources(List(new DescribeConfigsRequestData.DescribeConfigsResource()
         .setResourceName("topic-1")
-        .setResourceType(ConfigResource.Type.TOPIC.id)).asJava)).build(requestHeader.apiVersion))
+        .setResourceType(ConfigResource.Type.TOPIC.id)).asJava))
+      .build(requestHeader.apiVersion),
+      requestHeader = Option(requestHeader))
     createKafkaApis(authorizer = Some(authorizer)).handleDescribeConfigsRequest(request)
 
     verify(authorizer, adminManager)
   }
 
+  @Test
+  def testAlterClientQuotasWithAuthorizer(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val request = buildRequest(new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    expectNoThrottling()
+
+    EasyMock.expect(adminManager.alterClientQuotas(anyObject(), EasyMock.eq(false)))
+      .andReturn(Map(quotaEntity -> ApiError.NONE))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasWithNonControllerAndRedirectionDisabled(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest)
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.expect(adminManager.alterClientQuotas(anyObject(), EasyMock.eq(false)))
+      .andReturn(Map(quotaEntity -> ApiError.NONE))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    // Should just handle the config change since IBP is low
+    createKafkaApis(interBrokerProtocolVersion = KAFKA_2_6_IV0,
+      authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map(quotaEntity -> Errors.NONE))
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasWithRedirection(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val request = buildRequest(new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    expectNoThrottling()
+
+    val redirectRequestBuilder = new AlterClientQuotasRequest.Builder(
+      Set(new ClientQuotaAlteration(quotaEntity, Collections.emptySet())).asJava, false)
+
+    val capturedCallback = EasyMock.newCapture[ClientResponse => AbstractResponse]()
+
+    EasyMock.expect(redirectionManager.forwardRequest(
+      EasyMock.eq(redirectRequestBuilder),
+      anyObject[(RequestChannel.Request, Int => AbstractResponse,
+        Option[Send => Unit]) => Unit](),
+      EasyMock.eq(request),
+      EasyMock.capture(capturedCallback),
+      anyObject()
+    )).once()
+
+    val clientResponse: ClientResponse = EasyMock.createNiceMock(classOf[ClientResponse])
+    val alterClientQuotasResponse = new AlterClientQuotasResponse(
+      Map(quotaEntity -> ApiError.NONE).asJava, 10
+    )
+    EasyMock.expect(clientResponse.responseBody).andReturn(alterClientQuotasResponse)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel,
+      authorizer, controller, redirectionManager, clientResponse)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    assertEquals(alterClientQuotasResponse, capturedCallback.getValue.apply(clientResponse))
+
+    EasyMock.verify(controller, redirectionManager)
+  }
+
+  @Test
+  def testAlterClientQuotasAsForwardingRequestWithNonController(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    // Include extra header fields for forwarding request check
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0, "initial-principal", "initial-client")
+
+    val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest,
+      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map(quotaEntity -> Errors.NOT_CONTROLLER))
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasAsForwardingRequest(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
+    // As a forwarding request, we would use CLUSTER_ACTION to do a separate round of auth.
+    authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    // Include extra header fields for forwarding request check
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0, "initial-principal", "initial-client")
+
+    val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest,
+      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map( quotaEntity -> Errors.BROKER_AUTHORIZATION_FAILURE))
+
+    verify(authorizer, adminManager)
+  }
+
+  private def verifyAlterClientQuotaResult(alterClientQuotasRequest: AlterClientQuotasRequest,
+                                           capturedResponse: Capture[RequestChannel.Response],
+                                           expected: Map[ClientQuotaEntity, Errors]): Unit = {
+    val response = readResponse(ApiKeys.ALTER_CLIENT_QUOTAS, alterClientQuotasRequest, capturedResponse)
+      .asInstanceOf[AlterClientQuotasResponse]
+    val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap
+    response.complete(futures.asJava)
+    futures.foreach {
+      case (entity, future) =>
+        future.whenComplete((_, thrown) =>
+          assertEquals(thrown, expected(entity).exception())
+        ).isDone
+    }
+  }
+
+  @Test
+  def testCreateTopicsWithAuthorizer(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    val operation = AclOperation.CREATE
+    val topicName = "topic-1"
+    val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, ApiKeys.CREATE_TOPICS.latestVersion,
+      clientId, 0)
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    authorizeResource(authorizer, operation, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED, logIfDenied = false)
+
+    authorizeResource(authorizer, AclOperation.DESCRIBE_CONFIGS, ResourceType.TOPIC,
+      topicName, AuthorizationResult.ALLOWED, logIfDenied = false)
+
+    expectNoThrottling()
+
+    val topicsAuthorized = new CreateTopicsRequestData.CreatableTopicCollection(1)
+    val topicToCreate = new CreateTopicsRequestData.CreatableTopic()
+      .setName(topicName)
+    topicsAuthorized.add(topicToCreate)
+
+    val timeout = 10
+    val request = buildRequest(new CreateTopicsRequest.Builder(new CreateTopicsRequestData()
+      .setTimeoutMs(timeout)
+      .setValidateOnly(false)
+      .setTopics(topicsAuthorized))
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.eq(request), EasyMock.eq(6))).andReturn(UnboundedControllerMutationQuota)
+
+    EasyMock.expect(adminManager.createTopics(
+      EasyMock.eq(timeout),
+      EasyMock.eq(false),
+      EasyMock.eq(Map(topicName -> topicToCreate)),
+      anyObject(),
+      EasyMock.eq(UnboundedControllerMutationQuota),
+      anyObject()))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, authorizer, adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleCreateTopicsRequest(request)
+
+    verify(authorizer, adminManager, clientControllerQuotaManager)
+  }
+
+  @Test
+  def testCreateTopicsWithNonControllerAndRedirectionDisabled(): Unit = {

Review comment:
       Good to see the unit tests in here. I think we also need at least a couple integration tests. For example, could we add something to `CreateTopicsRequestTest` to ensure that forwarding works as expected?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1733,68 +1817,109 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None)
     }
 
-    val createTopicsRequest = request.body[CreateTopicsRequest]
-    val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
-      }
-      val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
-      val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
+    val forwardRequestHandler = new ForwardRequestHandler[CreateTopicsRequest,
+      CreateTopicsResponse, String, CreatableTopic](request) {
 
-      results.forEach { topic =>
-        if (results.findAll(topic.name).size > 1) {
-          topic.setErrorCode(Errors.INVALID_REQUEST.code)
-          topic.setErrorMessage("Found multiple entries for this topic.")
-        } else if (!authorizedTopics.contains(topic.name)) {
-          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-          topic.setErrorMessage("Authorization failed.")
-        }
-        if (!authorizedForDescribeConfigs.contains(topic.name)) {
-          topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+      override def resourceSplitByAuthorization(createTopicsRequest: CreateTopicsRequest):
+      (Map[String, CreatableTopic], Map[String, ApiError]) = {

Review comment:
       nit: this is subjective, but this style is a bit ugly. I would prefer the following:
   ```scala
   override def resourceSplitByAuthorization(
     createTopicsRequest: CreateTopicsRequest
   ): (Map[String, CreatableTopic], Map[String, ApiError]) = {
   ```
   That makes it easier visually to separate the return type and the function logic (again, in my opinion).

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),
+          sendResponseMaybeThrottle,
+          request,
+          response => {
+            mergeResponse(response.responseBody.asInstanceOf[R], unauthorizedResources)
+          })
+      } else {
+        // When IBP is smaller than 2.7, forwarding is not supported,
+        // therefore requests are handled directly
+        process(authorizedResources, unauthorizedResources, requestBody)

Review comment:
       We can't guarantee that this broker will still be the controller when we call `process` or that the broker we're forwarding to will still be the controller when it receives the request. In these cases, we need to return some retriable error to the client. Can you help me understand how this is implemented?

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -513,15 +513,21 @@ class AdminManager(val config: KafkaConfig,
     resource -> ApiError.NONE
   }
 
-  private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean,
-                                 configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
+  private def alterBrokerConfigs(resource: ConfigResource,
+                                 validateOnly: Boolean,
+                                 configProps: Properties,
+                                 configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
     val brokerId = getBrokerId(resource)
     val perBrokerConfig = brokerId.nonEmpty
     this.config.dynamicConfig.validate(configProps, perBrokerConfig)
     validateConfigPolicy(resource, configEntriesMap)
     if (!validateOnly) {
-      if (perBrokerConfig)
+      if (perBrokerConfig) {
+        val previousConfigProps = config.dynamicConfig.currentDynamicBrokerConfigs
         this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
+        this.config.dynamicConfig.maybeAugmentSSLStorePaths(configProps, previousConfigProps)

Review comment:
       Can you explain why this change is needed?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {

Review comment:
       nit: seems `handle` doesn't really need to be part of `ForwardRequestHandler`. Instead we could pull it out:
   ```scala
   private def handle(handler: ForwardRequestHandler): Unit = {
   ...
   ```
   The advantage of this is that it allows us to pull the type out of `KafkaApis` without inheriting all of the dependencies that are needed by `handle`.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
##########
@@ -25,23 +25,35 @@
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class IncrementalAlterConfigsResponse extends AbstractResponse {
 
-    public static IncrementalAlterConfigsResponseData toResponseData(final int requestThrottleMs,
-                                                                     final Map<ConfigResource, ApiError> results) {
-        IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData();
-        responseData.setThrottleTimeMs(requestThrottleMs);
-        for (Map.Entry<ConfigResource, ApiError> entry : results.entrySet()) {
-            responseData.responses().add(new AlterConfigsResourceResponse().
-                    setResourceName(entry.getKey().name()).
-                    setResourceType(entry.getKey().type().id()).
-                    setErrorCode(entry.getValue().error().code()).
-                    setErrorMessage(entry.getValue().message()));
-        }
-        return responseData;
+    public IncrementalAlterConfigsResponse(final int requestThrottleMs,
+                                           final Map<ConfigResource, ApiError> results) {
+        this.data = new IncrementalAlterConfigsResponseData()
+                        .setThrottleTimeMs(requestThrottleMs);
+
+        addResults(results);
+    }
+
+    public IncrementalAlterConfigsResponse addResults(final Map<ConfigResource, ApiError> results) {

Review comment:
       Typically responses are immutable after construction. It seems kind of a brittle pattern to rely on being able to mutate the response we receive from the other broker. For example we inherit the throttle time which is a bit weird. Are we saving that much by not creating a new response?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r464241946



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##########
@@ -34,6 +34,8 @@
     private final boolean expectResponse;
     private final int requestTimeoutMs;
     private final RequestCompletionHandler callback;
+    private final String initialPrincipalName;
+    private final String initialClientId;

Review comment:
       Could we use Optional for these two as they are not always provided?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+    public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                                                    final boolean validateOnly) {
+        return generateIncrementalRequestData(configs.keySet(), configs, validateOnly);
+    }
+
+    public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Collection<ConfigResource> resources,
+                                                                                    final Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                                                    final boolean validateOnly) {
+        IncrementalAlterConfigsRequestData data = new IncrementalAlterConfigsRequestData()
+                                                      .setValidateOnly(validateOnly);
+        for (ConfigResource resource : resources) {
+            IncrementalAlterConfigsRequestData.AlterableConfigCollection alterableConfigSet =
+                new IncrementalAlterConfigsRequestData.AlterableConfigCollection();
+            for (AlterConfigOp configEntry : configs.get(resource))
+                alterableConfigSet.add(new IncrementalAlterConfigsRequestData.AlterableConfig()
+                                           .setName(configEntry.configEntry().name())
+                                           .setValue(configEntry.configEntry().value())
+                                           .setConfigOperation(configEntry.opType().id()));
+            IncrementalAlterConfigsRequestData.AlterConfigsResource alterConfigsResource = new IncrementalAlterConfigsRequestData.AlterConfigsResource();
+            alterConfigsResource.setResourceType(resource.type().id())
+                .setResourceName(resource.name()).setConfigs(alterableConfigSet);
+            data.resources().add(alterConfigsResource);
+

Review comment:
       nit: empty line could be removed.

##########
File path: clients/src/main/resources/common/message/RequestHeader.json
##########
@@ -37,6 +37,12 @@
     // Since the client is sending the ApiVersionsRequest in order to discover what
     // versions are supported, the client does not know the best version to use.
     { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
-      "flexibleVersions": "none", "about": "The client ID string." }
+      "flexibleVersions": "none", "about": "The client ID string." },
+    { "name": "InitialPrincipalName", "type": "string", "tag": 0, "taggedVersions": "2+",
+      "nullableVersions": "2+", "default": "null", "ignorable": true,
+      "about": "Optional value of the initial principal name when the request is redirected by a broker, for audit logging purpose." },

Review comment:
       Actually, we will also use it for quota. I think that we could say that both `InitialPrincipalName` and `InitialClientId` will be used for logging and quota purposes.

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -145,7 +147,9 @@ abstract class InterBrokerSendThread(name: String,
 
 case class RequestAndCompletionHandler(destination: Node,
                                        request: AbstractRequest.Builder[_ <: AbstractRequest],
-                                       handler: RequestCompletionHandler)
+                                       handler: RequestCompletionHandler,
+                                       initialPrincipalName: String = null,
+                                       initialClientId: String = null)

Review comment:
       Shall we use Option here?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()

Review comment:
       It looks like that we will propagate the `NOT_CONTROLLER` error back to the client. Is it intentional? As clients don't send this request to the controller (and new ones won't get the controller id anymore), it sounds weird to return them this error. We could perhaps return another generic error.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.incrementalAlterConfigs(
+          authorizedResources, incrementalAlterConfigsRequest.data.validateOnly)
+
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder(
+        AlterConfigsUtil.generateIncrementalRequestData( authorizedResources.map {
+          case (resource, ops) => resource -> ops.asJavaCollection
+        }.asJava, incrementalAlterConfigsRequest.data().validateOnly()))
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedIncrementalAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),

Review comment:
       Have we considered using Scala functions as callbacks? It would be more aligned with the other callbacks that we have in Scala and also would avoid having to define classes for each handler that support forwarding. What do you think?

##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##########
@@ -51,7 +55,9 @@ public ClientRequest(String destination,
                          long createdTimeMs,
                          boolean expectResponse,
                          int requestTimeoutMs,
-                         RequestCompletionHandler callback) {
+                         RequestCompletionHandler callback,

Review comment:
       nit: I would actually keep the callback as the last argument as it is a bit more natural to have the callback last.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##########
@@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) {
     }
 
     public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) {
-        this(new RequestHeaderData().
-                setRequestApiKey(requestApiKey.id).
-                setRequestApiVersion(requestVersion).
-                setClientId(clientId).
-                setCorrelationId(correlationId),
+        this(requestApiKey, requestVersion, clientId, correlationId, null, null);
+    }
+
+    public RequestHeader(ApiKeys requestApiKey,
+                         short requestVersion,
+                         String clientId,
+                         int correlationId,
+                         String initialPrincipalName,
+                         String initialClientId) {

Review comment:
       Shall we use Optional here as well?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
##########
@@ -47,13 +48,15 @@ public RequestContext(RequestHeader header,
                           KafkaPrincipal principal,
                           ListenerName listenerName,
                           SecurityProtocol securityProtocol,
-                          ClientInformation clientInformation) {
+                          ClientInformation clientInformation,
+                          boolean fromControlPlane) {
         this.header = header;
         this.connectionId = connectionId;
         this.clientAddress = clientAddress;
         this.principal = principal;
         this.listenerName = listenerName;
         this.securityProtocol = securityProtocol;
+        this.fromControlPlane = fromControlPlane;

Review comment:
       nit: Could we move it after `clientInformation` to keep the order inline with the order in the constructor?

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -100,7 +100,9 @@ object ApiVersion {
     // Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570)
     KAFKA_2_6_IV0,
     // Introduced feature versioning support (KIP-584)
-    KAFKA_2_7_IV0
+    KAFKA_2_7_IV0,
+    // Introduced redirection support (KIP-590)
+    KAFKA_2_7_IV1

Review comment:
       As 2.7 has not be release yet, we don't need to introduce a new version. We can reuse `KAFKA_2_7_IV0`.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
##########
@@ -71,12 +71,12 @@ public Builder(Map<ConfigResource, Config> configs, boolean validateOnly) {
             Objects.requireNonNull(configs, "configs");
             for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
                 AlterConfigsRequestData.AlterConfigsResource resource = new AlterConfigsRequestData.AlterConfigsResource()
-                        .setResourceName(entry.getKey().name())
-                        .setResourceType(entry.getKey().type().id());
+                                                                            .setResourceName(entry.getKey().name())
+                                                                            .setResourceType(entry.getKey().type().id());

Review comment:
       I personally prefer the previous indentation which is, I believe, more common in our code base. Or do we plan to adopt a new formatting?

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -309,7 +310,10 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int, val metricNamePrefix : String, time: Time) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int,
+                     val metricNamePrefix : String,

Review comment:
       nit: That was already present before your change but could we remove the extra space before the colon?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.incrementalAlterConfigs(
+          authorizedResources, incrementalAlterConfigsRequest.data.validateOnly)
+
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder(
+        AlterConfigsUtil.generateIncrementalRequestData( authorizedResources.map {

Review comment:
       nit: Remove extra space before `authorizedResources`.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -458,7 +459,7 @@ class AclAuthorizer extends Authorizer with Logging {
       val apiKey = if (ApiKeys.hasId(requestContext.requestType)) ApiKeys.forId(requestContext.requestType).name else requestContext.requestType
       val refCount = action.resourceReferenceCount
 
-      s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount"
+      s"[Principal = $principal, Initial Principal Name = $initialPrincipalName]: is $authResult Operation = $operation from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount"

Review comment:
       The usage of the square brackets and the colon looks weird here. The audit log does not look like a sentence anymore. I wonder if we could go with something like this instead: `Principal = A on behalf of Principal = B is allowed...`. We could also put the initial principal name only if it is set.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {

Review comment:
       For my understanding, I suppose that we don't verify that redirection is enabled here to ensure that the controller can accept forwarded requests as soon as one broker in the cluster is configured with IBP 2.7. Am I getting this right?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.incrementalAlterConfigs(
+          authorizedResources, incrementalAlterConfigsRequest.data.validateOnly)
+
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder(
+        AlterConfigsUtil.generateIncrementalRequestData( authorizedResources.map {
+          case (resource, ops) => resource -> ops.asJavaCollection
+        }.asJava, incrementalAlterConfigsRequest.data().validateOnly()))
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedIncrementalAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),
+        request.header.initialPrincipalName,
+        request.header.initialClientId)
+    } else {
+      // When IBP is low, we would just handle the config request even if we are not the controller,
+      // as admin client doesn't know how to find the controller.

Review comment:
       nit: `as admin client doesn't know how to find the controller` is not relevant anymore. What about the following: `When IBP is smaller than XYZ, forwarding is not supported therefore requests are handled directly`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2982,12 +3091,33 @@ class KafkaApis(val requestChannel: RequestChannel,
                                 logIfDenied: Boolean = true,
                                 refCount: Int = 1): Boolean = {
     authorizer.forall { authZ =>
-      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
-      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
-      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+      if (authorizeAction(requestContext, operation,
+        resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)) {
+        true
+      } else {
+        operation match {
+          case ALTER | ALTER_CONFIGS | CREATE | DELETE =>
+            requestContext.fromControlPlane &&

Review comment:
       * I presume that this does not work if we use the same listener for bother the control plane and the data plane.
   * I also wonder if it is a good thing to have this extension here as it applies to all the authorization in the Api Layer. I think that we should be cautious and only do this for forwarded requests.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2453,34 +2455,98 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
     val alterConfigsRequest = request.body[AlterConfigsRequest]
-    val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
+    val requestResources = alterConfigsRequest.configs.asScala.toMap
+
+    val (authorizedResources, unauthorizedResources) = requestResources.partition { case (resource, _) =>
       resource.`type` match {
         case ConfigResource.Type.BROKER_LOGGER =>
-          throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
+          throw new InvalidRequestException(
+            s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
         case ConfigResource.Type.BROKER =>
           authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
           authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
     }
-    val authorizedResult = adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new AlterConfigsResponse(results.asJava, requestThrottleMs))
     }
-    def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
-      val data = new AlterConfigsResponseData()
-        .setThrottleTimeMs(requestThrottleMs)
-      (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) =>
-        data.responses().add(new AlterConfigsResourceResponse()
-          .setErrorCode(error.error.code)
-          .setErrorMessage(error.message)
-          .setResourceName(resource.name)
-          .setResourceType(resource.`type`.id))
+
+    def notControllerResponse(): Unit = {
+      val errorResult = requestResources.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.alterConfigs(
+          authorizedResources, alterConfigsRequest.validateOnly)
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new AlterConfigsRequest.Builder(
+        authorizedResources.asJava, alterConfigsRequest.validateOnly())
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),
+        request.header.initialPrincipalName,
+        request.header.initialClientId)
+    } else {
+      // When IBP is low, we would just handle the config request, as admin client doesn't know
+      // how to find the controller.
+      val authorizedResult = adminManager.alterConfigs(
+        authorizedResources, alterConfigsRequest.validateOnly)
+      val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+        resource -> configsAuthorizationApiError(resource)
       }
-      new AlterConfigsResponse(data)
+
+      sendResponseCallback(authorizedResult ++ unauthorizedResult)
+    }
+  }
+
+  private def isForwardingRequest(request: RequestChannel.Request): Boolean = {
+    request.header.initialPrincipalName != null &&
+      request.header.initialClientId != null &&
+      request.context.fromControlPlane

Review comment:
       I presume that this does not work if the broker uses the same listener for the control plane and the data plane.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493795436



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),

Review comment:
       It's a bit hard since we are passing requestBuilder all the way to NetworkClient, so if we want a designated version to build the request, that may involve some non-trivial changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r511007228



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def checkForwarding(request: RequestChannel.Request): Unit = {
+    if (!request.header.apiKey.forwardable && request.envelopeContext.isDefined) {

Review comment:
       For pt2, if the forwarding is not enabled on the active controller, but it has the capability, should we just serve the request?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r504341071



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
##########
@@ -70,9 +70,12 @@ public int throttleTimeMs() {
     @Override
     public Map<Errors, Integer> errorCounts() {
         HashMap<Errors, Integer> counts = new HashMap<>();
-        data.topics().forEach(result ->
-            updateErrorCounts(counts, Errors.forCode(result.errorCode()))
-        );
+        for (CreateTopicsResponseData.CreatableTopicResult result : data.topics()) {
+            Errors error = Errors.forCode(result.errorCode());
+            if (error != Errors.NONE) {

Review comment:
       https://issues.apache.org/jira/browse/KAFKA-10607




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r516363883



##########
File path: core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##########
@@ -70,12 +74,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
-object DynamicBrokerReconfigurationTest {
-  val SecureInternal = "INTERNAL"
-  val SecureExternal = "EXTERNAL"
-}
-
-class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSetup {
+@RunWith(value = classOf[Parameterized])
+class DynamicBrokerReconfigurationTest(quorumBasedController: JBoolean) extends ZooKeeperTestHarness with SaslSetup {

Review comment:
       It is quite expensive to parameterize these test cases. I am not sure it is worthwhile. If forwarding works for one of these cases, why would the others be different? Since we are not planning to enable this feature yet, I think unit tests in `KafkaApisTest` and maybe one integration test are good enough.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -352,6 +352,8 @@ object KafkaConfig {
   val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMaxMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
+  private[server] val enableMetadataQuorumProp = "enable.metadata.quorum"

Review comment:
       nit: every other property name uses a capital first letter

##########
File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the the purpose of inter-broker forwarding.
+ * Any serialization/deserialization failure should raise a {@link SerializationException} to be consistent.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal);

Review comment:
       Can you add a javadoc for these methods and mention `@throws SerializationException`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -993,6 +1013,34 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: Option[KafkaPrincipalSerde]) = {
+    val envelopeRequest = context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+    val originalHeader = RequestHeader.parse(envelopeRequest.requestData)
+    // Leave the principal null here is ok since we will fail the request during Kafka API handling.
+    val originalPrincipal = if (principalSerde.isDefined)
+      principalSerde.get.deserialize(envelopeRequest.principalData)
+    else
+      null
+
+    val originalClientAddress = InetAddress.getByAddress(envelopeRequest.clientAddress)
+    val originalContext = new RequestContext(originalHeader, connectionId,
+      originalClientAddress, originalPrincipal, listenerName,
+      securityProtocol, context.clientInformation, isPrivilegedListener)
+
+    val envelopeContext = new EnvelopeContext(
+      brokerContext = context,
+      receive.payload)
+
+    new network.RequestChannel.Request(processor = id, context = originalContext,

Review comment:
       nit: `network` prefix is not needed since we are already in this package

##########
File path: clients/src/main/java/org/apache/kafka/common/errors/PrincipalDeserializationFailureException.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Exception used to indicate a kafka principal deserialization failure during request forwarding.
+ */
+public class PrincipalDeserializationFailureException extends AuthorizationException {

Review comment:
       nit: I feel `FailureException` is redundant. Can we just call it `PrincipalDeserializationException`?
   
   Also, I am not sure about this extending `AuthorizationException`. I would consider it more of an invalid request than an authorization failure, though the effect is the same. I think it's probably better to avoid categorizing it and just let it extend `ApiException`.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,74 +125,126 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def buildFailedEnvelopeResponse(request: RequestChannel.Request, error: Errors): Unit = {

Review comment:
       nit: we are doing more than building the response here, we are sending it. How about `sendFailedEnvelopeResponse`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,74 +125,126 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def buildFailedEnvelopeResponse(request: RequestChannel.Request, error: Errors): Unit = {
+    val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+    // Only throttle cluster authorization failures
+    if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+      quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
+    sendResponse(request, None, None, error)
+  }
+
+  private def validateForwardRequest(request: RequestChannel.Request): Boolean = {

Review comment:
       nit: `validatedForwardedRequest`

##########
File path: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
##########
@@ -180,7 +180,6 @@ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> request
 
     /**
      * Create a new ClientRequest.
-     *

Review comment:
       nit: seems this change was not needed

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,74 +125,126 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def buildFailedEnvelopeResponse(request: RequestChannel.Request, error: Errors): Unit = {
+    val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+    // Only throttle cluster authorization failures
+    if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+      quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse)
+    sendResponse(request, None, None, error)
+  }
+
+  private def validateForwardRequest(request: RequestChannel.Request): Boolean = {
+    if (!config.forwardingEnabled || !request.context.fromPrivilegedListener) {
+      // If the designated forwarding request is not coming from a privileged listener, or
+      // forwarding is not enabled yet, we would not handle the request.
+      closeConnection(request, Collections.emptyMap())
+      false
+    } else if (!authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
+      // Forwarding request must have CLUSTER_ACTION authorization to reduce the risk of impersonation.
+      buildFailedEnvelopeResponse(request, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      false
+    } else if (!request.header.apiKey.forwardable) {
+      buildFailedEnvelopeResponse(request, Errors.INVALID_REQUEST)
+      false
+    } else if (request.principalSerde.isEmpty) {
+      buildFailedEnvelopeResponse(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE)
+      false
+    } else if (!controller.isActive) {
+      buildFailedEnvelopeResponse(request, Errors.NOT_CONTROLLER)
+      false
+    } else
+      true
+  }
+
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (!request.isForwarded && !controller.isActive && isForwardingEnabled(request)) {
+      forwardingManager.forwardRequest(sendResponseMaybeThrottle, request)
+    } else {
+      // When the KIP-500 mode is off or the principal serde is undefined, forwarding is not supported,
+      // therefore requests are handled directly.
+      handler(request)
+    }
+  }
+
+  private def isForwardingEnabled(request: RequestChannel.Request): Boolean =
+    config.forwardingEnabled && request.principalSerde.isDefined
+
   /**
    * Top-level method that handles all requests and multiplexes to the right api
    */
   override def handle(request: RequestChannel.Request): Unit = {
     try {
       trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
         s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
-      request.header.apiKey match {
-        case ApiKeys.PRODUCE => handleProduceRequest(request)
-        case ApiKeys.FETCH => handleFetchRequest(request)
-        case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
-        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
-        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
-        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
-        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
-        case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
-        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
-        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
-        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
-        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
-        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
-        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
-        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
-        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
-        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
-        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
-        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
-        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
-        case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
-        case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
-        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
-        case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
-        case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
-        case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
-        case ApiKeys.END_TXN => handleEndTxnRequest(request)
-        case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
-        case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
-        case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
-        case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
-        case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
-        case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
-        case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
-        case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
-        case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
-        case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
-        case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
-        case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
-        case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
-        case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
-        case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
-        case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
-        case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
-        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
-        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request)
-        case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request)
-        case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
-        case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
-        case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotasRequest(request)
-        case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
-        case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentialsRequest(request)
-        case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
-        case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
-        // Until we are ready to integrate the Raft layer, these APIs are treated as
-        // unexpected and we just close the connection.
-        case ApiKeys.VOTE => closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.BEGIN_QUORUM_EPOCH => closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.END_QUORUM_EPOCH => closeConnection(request, util.Collections.emptyMap())
-        case ApiKeys.DESCRIBE_QUORUM => closeConnection(request, util.Collections.emptyMap())
+
+      val isValidRequest = !request.isForwarded || validateForwardRequest(request)

Review comment:
       I think it would be simpler to short-cut return.
   
   ```scala
   if (request.isForwarded && !validateForwardRequest(request))
     return
   ```

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -993,6 +1013,34 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: Option[KafkaPrincipalSerde]) = {
+    val envelopeRequest = context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+    val originalHeader = RequestHeader.parse(envelopeRequest.requestData)

Review comment:
       nit: instead of `original`, could we use `forwarded` in these names?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -993,6 +1013,34 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: Option[KafkaPrincipalSerde]) = {

Review comment:
       nit: define return type

##########
File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal);
+
+    KafkaPrincipal deserialize(ByteBuffer bytes);

Review comment:
       > I was under the impression that byte buffer provides more information such as a read position and capacity/limits, which makes the deserialization easier.
   
   Hmm, not sure I get your point. Nothing is simpler than a byte array. The main question is whether we want to expose the actual request buffer to the plugin, especially since we still plan on using it afterwards. The plugin is treated as a trusted component in any case, so it might not make a big difference. Probably we should optimize here for simplicity.
   
   > If given a byte[], I'm afraid they need to convert to byte buffer internally eventually.
   
   That may or may not be true. If it is, users can just use `ByteBuffer.wrap`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r513826471



##########
File path: clients/src/main/resources/common/message/RequestHeader.json
##########
@@ -37,12 +37,6 @@
     // Since the client is sending the ApiVersionsRequest in order to discover what
     // versions are supported, the client does not know the best version to use.
     { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
-      "flexibleVersions": "none", "about": "The client ID string." },
-    { "name": "InitialPrincipalName", "type": "string", "tag": 0, "taggedVersions": "2+",

Review comment:
       Sg, will initiate a PR for that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493823776



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),

Review comment:
       As discussed offline, we can pass the expected version down to the Builder. The abstract builder already supports an explicit range of versions. In any case, it doesn't seem like we have a choice.
   
   By the way, one potential edge case here is that the broker receiving the request has upgraded to a later version than the controller. This would be possible in the middle of a rolling upgrade. I don't think there's an easy way to handle this. We could return UNSUPPORTED_VERSION to the client, but that would be surprising since the client chose a supported API based on ApiVersions and is not aware of the controller redirection.
   
   One idea to address this problem is to gate version upgrades to redirectable APIs by the IBP. Basically all of these APIs have become inter-broker APIs through redirection so they need the safeguard of the IBP. Feels like we might have to do this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r475714230



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
##########
@@ -87,6 +87,16 @@ public Builder(Map<ConfigResource, Config> configs, boolean validateOnly) {
         public AlterConfigsRequest build(short version) {
             return new AlterConfigsRequest(data, version);
         }
+

Review comment:
       In general we don't define equals or hashCode on these builders.  Why are we defining it here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r504286524



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DefaultPrincipalData;
+import org.apache.kafka.common.message.EnvelopeRequestData;
+import org.apache.kafka.common.message.EnvelopeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+
+import java.nio.ByteBuffer;
+
+public class EnvelopeRequest extends AbstractRequest {
+
+    public static class Builder extends AbstractRequest.Builder<EnvelopeRequest> {
+
+        private final EnvelopeRequestData data;
+
+        public Builder(ByteBuffer embedData, String clientHostName) {
+            this(embedData, null, clientHostName);
+        }
+
+        public Builder(ByteBuffer embedData,

Review comment:
       nit: why don't we call it `requestData` to be consistent with the name used in the api spec?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
##########
@@ -70,9 +70,12 @@ public int throttleTimeMs() {
     @Override
     public Map<Errors, Integer> errorCounts() {
         HashMap<Errors, Integer> counts = new HashMap<>();
-        data.topics().forEach(result ->
-            updateErrorCounts(counts, Errors.forCode(result.errorCode()))
-        );
+        for (CreateTopicsResponseData.CreatableTopicResult result : data.topics()) {
+            Errors error = Errors.forCode(result.errorCode());
+            if (error != Errors.NONE) {

Review comment:
       Not sure why we need this change. I think the convention is to include `NONE` in error counts.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DefaultPrincipalData;
+import org.apache.kafka.common.message.EnvelopeRequestData;
+import org.apache.kafka.common.message.EnvelopeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+
+import java.nio.ByteBuffer;
+
+public class EnvelopeRequest extends AbstractRequest {
+
+    public static class Builder extends AbstractRequest.Builder<EnvelopeRequest> {
+
+        private final EnvelopeRequestData data;
+
+        public Builder(ByteBuffer embedData, String clientHostName) {
+            this(embedData, null, clientHostName);
+        }
+
+        public Builder(ByteBuffer embedData,
+                       ByteBuffer serializedPrincipal,
+                       String clientHostName) {
+            super(ApiKeys.ENVELOPE);
+            this.data = new EnvelopeRequestData()
+                            .setRequestData(embedData)
+                            .setRequestPrincipal(serializedPrincipal)
+                            .setClientHostName(clientHostName);
+        }
+
+        @Override
+        public EnvelopeRequest build(short version) {
+            return new EnvelopeRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final EnvelopeRequestData data;
+
+    public EnvelopeRequest(EnvelopeRequestData data, short version) {
+        super(ApiKeys.ENVELOPE, version);
+        this.data = data;
+    }
+
+    public EnvelopeRequest(Struct struct, short version) {
+        super(ApiKeys.ENVELOPE, version);
+        this.data = new EnvelopeRequestData(struct, version);
+    }
+
+    public ByteBuffer requestData() {
+        return data.requestData();
+    }
+
+    public String clientHostName() {
+        return data.clientHostName();
+    }
+
+    public KafkaPrincipal requestPrincipal(KafkaPrincipalSerde principalSerde) {

Review comment:
       nit: I think it might be better to pull this out of the request class. The direction we're moving is toward dumber request/response classes. Eventually `EnvelopeRequest` will go away and we'll just use `EnvelopeRequestData`.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -43,15 +43,43 @@
  */
 public class ApiVersionsResponse extends AbstractResponse {
 
+    public static final int MIN_CONSTRAINT_IBP_VERSION = 31;

Review comment:
       I'm wondering if we really need the IBP to leak into the common library. It should really only be a broker concern. Seems like the only point is so that we can continue to use the factory methods defined below from the broker code. Is that right? Could we instead move the factories to the broker?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
##########
@@ -78,8 +101,7 @@ public RequestAndSize parseRequest(ByteBuffer buffer) {
                         ", connectionId: " + connectionId +
                         ", listenerName: " + listenerName +
                         ", principal: " + principal +
-                        ", initialPrincipal: " + initialPrincipalName() +
-                        ", initialClientId: " + header.initialClientId(), ex);
+                        ", forwardingPrincipal: " + forwardingPrincipal, ex);

Review comment:
       nit: maybe print `forwardingPrincipal` only if it is defined

##########
File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal, short version);

Review comment:
       It is strange to couple the serialization of the principal with the version of the envelope request. This might help us in the case of default principal builder, but users with their own custom builder are on their own, right? I think it is better to be consistent and leave versioning to the principal builder as well. 

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/EnvelopeResponse.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.EnvelopeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class EnvelopeResponse extends AbstractResponse {
+
+    private final EnvelopeResponseData data;
+
+    public EnvelopeResponse(int throttleTimeMs, AbstractResponse innerResponse, short innerApiVersion) {

Review comment:
       In a similar vein, I think it's better to not include serialization logic in the response object. It tends to hide some of the details like byte buffer allocation that we might want to control at another level. 

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/EnvelopeResponse.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.EnvelopeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class EnvelopeResponse extends AbstractResponse {
+
+    private final EnvelopeResponseData data;
+
+    public EnvelopeResponse(int throttleTimeMs, AbstractResponse innerResponse, short innerApiVersion) {
+        Struct dataStruct = innerResponse.toStruct(innerApiVersion);
+        ByteBuffer buffer = ByteBuffer.allocate(dataStruct.sizeOf());
+        dataStruct.writeTo(buffer);
+        buffer.flip();
+
+        this.data = new EnvelopeResponseData()
+                        .setThrottleTimeMs(throttleTimeMs)
+                        .setResponseData(buffer);
+    }
+
+    public EnvelopeResponse(EnvelopeResponseData data) {
+        this.data = data;
+    }
+
+    public AbstractResponse embedResponse(RequestHeader originalHeader) {

Review comment:
       Same here. We can return `ByteBuffer` and leave parsing to higher layers.

##########
File path: clients/src/main/resources/common/message/EnvelopeRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 58,
+  "type": "request",
+  "name": "EnvelopeRequest",
+  // Request struct for redirection.
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "RequestData", "type": "bytes", "versions": "0+", "zeroCopy": true,
+      "about": "The embedded request header and data."},
+    { "name": "PrincipalIdToken", "type": "bytes", "tag": 0, "taggedVersions": "0+" },

Review comment:
       Do we have a use case for this yet? I don't see that it gets used anywhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r467215647



##########
File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -497,10 +497,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def alterConfigsRequest =
     new AlterConfigsRequest.Builder(
-      Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic),
-        new AlterConfigsRequest.Config(Collections.singleton(
-          new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
-        ))), true).build()
+        Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic),
+          new AlterConfigsRequest.Config(Collections.singleton(
+            new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
+          ))), true).build()

Review comment:
       Let me check around.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r464567004



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##########
@@ -34,6 +34,8 @@
     private final boolean expectResponse;
     private final int requestTimeoutMs;
     private final RequestCompletionHandler callback;
+    private final String initialPrincipalName;
+    private final String initialClientId;

Review comment:
       It is not necessary as we don't check nulls for these fields.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2453,34 +2455,98 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
     val alterConfigsRequest = request.body[AlterConfigsRequest]
-    val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
+    val requestResources = alterConfigsRequest.configs.asScala.toMap
+
+    val (authorizedResources, unauthorizedResources) = requestResources.partition { case (resource, _) =>
       resource.`type` match {
         case ConfigResource.Type.BROKER_LOGGER =>
-          throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
+          throw new InvalidRequestException(
+            s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
         case ConfigResource.Type.BROKER =>
           authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
           authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
     }
-    val authorizedResult = adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new AlterConfigsResponse(results.asJava, requestThrottleMs))
     }
-    def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
-      val data = new AlterConfigsResponseData()
-        .setThrottleTimeMs(requestThrottleMs)
-      (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) =>
-        data.responses().add(new AlterConfigsResourceResponse()
-          .setErrorCode(error.error.code)
-          .setErrorMessage(error.message)
-          .setResourceName(resource.name)
-          .setResourceType(resource.`type`.id))
+
+    def notControllerResponse(): Unit = {
+      val errorResult = requestResources.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.alterConfigs(
+          authorizedResources, alterConfigsRequest.validateOnly)
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new AlterConfigsRequest.Builder(
+        authorizedResources.asJava, alterConfigsRequest.validateOnly())
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),
+        request.header.initialPrincipalName,
+        request.header.initialClientId)
+    } else {
+      // When IBP is low, we would just handle the config request, as admin client doesn't know
+      // how to find the controller.
+      val authorizedResult = adminManager.alterConfigs(
+        authorizedResources, alterConfigsRequest.validateOnly)
+      val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+        resource -> configsAuthorizationApiError(resource)
       }
-      new AlterConfigsResponse(data)
+
+      sendResponseCallback(authorizedResult ++ unauthorizedResult)
+    }
+  }
+
+  private def isForwardingRequest(request: RequestChannel.Request): Boolean = {
+    request.header.initialPrincipalName != null &&
+      request.header.initialClientId != null &&
+      request.context.fromControlPlane

Review comment:
       Will requests only flow to data plane if they use the same listener?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {

Review comment:
       Yes, the purpose is to always handle a forwarding request even if IBP is not 2.7 yet. This is because some brokers may already upgrade their IBP and they start sending forwarding requests, which is totally legitimate.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.incrementalAlterConfigs(
+          authorizedResources, incrementalAlterConfigsRequest.data.validateOnly)
+
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new IncrementalAlterConfigsRequest.Builder(
+        AlterConfigsUtil.generateIncrementalRequestData( authorizedResources.map {
+          case (resource, ops) => resource -> ops.asJavaCollection
+        }.asJava, incrementalAlterConfigsRequest.data().validateOnly()))
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedIncrementalAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),
+        request.header.initialPrincipalName,
+        request.header.initialClientId)
+    } else {
+      // When IBP is low, we would just handle the config request even if we are not the controller,
+      // as admin client doesn't know how to find the controller.

Review comment:
       Sg!

##########
File path: clients/src/main/resources/common/message/RequestHeader.json
##########
@@ -37,6 +37,12 @@
     // Since the client is sending the ApiVersionsRequest in order to discover what
     // versions are supported, the client does not know the best version to use.
     { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
-      "flexibleVersions": "none", "about": "The client ID string." }
+      "flexibleVersions": "none", "about": "The client ID string." },
+    { "name": "InitialPrincipalName", "type": "string", "tag": 0, "taggedVersions": "2+",
+      "nullableVersions": "2+", "default": "null", "ignorable": true,
+      "about": "Optional value of the initial principal name when the request is redirected by a broker, for audit logging purpose." },

Review comment:
       I don't think we need initial client id for audit logging, is there some other logging you have in mind?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##########
@@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) {
     }
 
     public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) {
-        this(new RequestHeaderData().
-                setRequestApiKey(requestApiKey.id).
-                setRequestApiVersion(requestVersion).
-                setClientId(clientId).
-                setCorrelationId(correlationId),
+        this(requestApiKey, requestVersion, clientId, correlationId, null, null);
+    }
+
+    public RequestHeader(ApiKeys requestApiKey,
+                         short requestVersion,
+                         String clientId,
+                         int correlationId,
+                         String initialPrincipalName,
+                         String initialClientId) {

Review comment:
       Not necessary, as explained.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()

Review comment:
       Not this is propagating to the sender broker.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r513750141



##########
File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal);
+
+    KafkaPrincipal deserialize(ByteBuffer bytes);

Review comment:
       I was under the impression that byte buffer provides more information such as a read position and capacity/limits, which makes the deserialization easier. If given a byte[], I'm afraid they need to convert to byte buffer internally eventually.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493751026



##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -513,15 +513,21 @@ class AdminManager(val config: KafkaConfig,
     resource -> ApiError.NONE
   }
 
-  private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean,
-                                 configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
+  private def alterBrokerConfigs(resource: ConfigResource,
+                                 validateOnly: Boolean,
+                                 configProps: Properties,
+                                 configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
     val brokerId = getBrokerId(resource)
     val perBrokerConfig = brokerId.nonEmpty
     this.config.dynamicConfig.validate(configProps, perBrokerConfig)
     validateConfigPolicy(resource, configEntriesMap)
     if (!validateOnly) {
-      if (perBrokerConfig)
+      if (perBrokerConfig) {
+        val previousConfigProps = config.dynamicConfig.currentDynamicBrokerConfigs
         this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
+        this.config.dynamicConfig.maybeAugmentSSLStorePaths(configProps, previousConfigProps)

Review comment:
       The rational is to trigger a reload of ssl store file by the ZK notification. @cmccabe @rajinisivaram came out this idea to augment the path to
   ```
   //path//to//ssl//store//file
   ```
   when a reload is requested on the receiver broker, and by propagating such a path other brokers would see a difference and thus reload their corresponding store files as well. In the meantime, we need to trim the path back to single slash after handling the notification:
   ```
   /path/to/ssl/store/file
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r504298663



##########
File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal, short version);

Review comment:
       It is strange to couple the serialization of the principal with the version of the envelope request. This might help us in the case of default principal builder, but users with their own custom builder are on their own, right? I think it is better to be consistent and always leave versioning to the principal builder. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r504327985



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
##########
@@ -70,9 +70,12 @@ public int throttleTimeMs() {
     @Override
     public Map<Errors, Integer> errorCounts() {
         HashMap<Errors, Integer> counts = new HashMap<>();
-        data.topics().forEach(result ->
-            updateErrorCounts(counts, Errors.forCode(result.errorCode()))
-        );
+        for (CreateTopicsResponseData.CreatableTopicResult result : data.topics()) {
+            Errors error = Errors.forCode(result.errorCode());
+            if (error != Errors.NONE) {

Review comment:
       I guess there are some inconsistency between different RPCs as I spotted cases excluding NONE. I would initiate a separate JIRA for the cleaning and revert the change here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r513743473



##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -104,7 +108,9 @@ object ApiVersion {
     // Bup Fetch protocol for Raft protocol (KIP-595)
     KAFKA_2_7_IV1,
     // Introduced AlterIsr (KIP-497)
-    KAFKA_2_7_IV2
+    KAFKA_2_7_IV2,
+    // Introduced IBP based constraints for ApiVersion (KIP-590)

Review comment:
       I think it's ok to remove this flag for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-696225206


   Hi @abbccdda ,
   
   Thanks for the PR!  It looks good.  I like the idea behind `ForwardRequestHandler`.
   
   Since this class doesn't have any internal state, I wonder if it would be more Scala-like to just have a function which just takes some callbacks as arguments.
   
   Can we get rid of the need for the `customizedAuthorizationError` callback by having `resourceSplitByAuthorization` return a `Map[RK, ApiError]` instead of `Map[RK, RV]`?  When would RV be needed for keys where authorization failed?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510552124



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),

Review comment:
       Quotas are one aspect of this work that need more consideration. What we don't want is for the inter-broker channel to get affected by the individual client throttle, which is what will happen with the current patch. What I'd suggest for now is that we allow the broker to track client quotas and pass back the throttle value in the underlying response, but we set the envelope throttle time to 0 and ensure that the channel does not get throttled. 
   
   For this, I think we we will need to change the logic in `KafkaApis.sendResponseMaybeThrottle`. If it is a forwarded request, we still need to check `maybeRecordAndGetThrottleTimeMs`, but we can skip the call to `ClientQuotaManager.throttle`. When the response is received on the forwarding broker, we will need to apply the throttle, which I think the patch already handles.
   
   One challenging aspect is how this will affect quota metrics. Currently quota/throttling metrics are relatively simple because they are recorded separately by each broker. However, here the controller is the one that is tracking the throttling for the client across multiple inbound connections from multiple brokers. This means that the broker that is applying a throttle for a forwarded request may not have actually observed a quota violation. Other than causing some reporting confusion, I am not sure whether there are any other consequences to this.
   
   cc @apovzner @rajinisivaram 

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def checkForwarding(request: RequestChannel.Request): Unit = {
+    if (!request.header.apiKey.forwardable && request.envelopeContext.isDefined) {
+      throw new IllegalStateException("Given RPC " + request.header.apiKey + " does not support forwarding.")
+    }
+  }
+
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+      sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined &&
+      (!request.context.fromPrivilegedListener ||
+      !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME))
+    ) {
+      // If the designated forwarding request is not coming from a privileged listener, or
+      // it fails CLUSTER_ACTION permission, we would fail the authorization.
+      sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception())

Review comment:
       One challenge we have here is that there are two levels of errors. The current patch seems to conflate the two, which makes it confusing. I think we need a structure which allows us to separate the errors possible at the envelope level and those possible at the request level. What I'm thinking is this:
   
   1. For cluster auth and principal serde errors, we should return the envelope error and null response body.
   2. For everything else, we return envelope error NONE and just pass through whatever error is in the response.
   
   Does that make sense?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def checkForwarding(request: RequestChannel.Request): Unit = {
+    if (!request.header.apiKey.forwardable && request.envelopeContext.isDefined) {

Review comment:
       Can we move some of the checks from `maybeForward` here? This is the flow I'm thinking about:
   
   1. First check authorization => CLUSTER_AUTHORIZATION_FAILURE
   2. Verify forwarding is enabled => INVALID_REQUEST
   3. Verify the api is forwardable => INVALID_REQUEST 
   
   If all of these pass, then the request continues down the normal handling path.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r467214507



##########
File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##########
@@ -497,10 +497,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def alterConfigsRequest =
     new AlterConfigsRequest.Builder(
-      Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic),
-        new AlterConfigsRequest.Config(Collections.singleton(
-          new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
-        ))), true).build()
+        Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic),
+          new AlterConfigsRequest.Config(Collections.singleton(
+            new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
+          ))), true).build()

Review comment:
       Can we get rid of whitespace-only changes like this, or at least move them to another PR?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r473158779



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
##########
@@ -76,6 +77,16 @@ public AlterClientQuotasRequest build(short version) {
         public String toString() {
             return data.toString();
         }
+
+        @Override
+        public boolean equals(Object other) {

Review comment:
       Add equality check for the sake of easymock verification




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r513759802



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),
+            abstractResponse.serializeBody(context.header.apiVersion),
+            error
+          )
+
+          envelopeContext.brokerContext.buildResponse(envelopeResponse)
+        case None =>
+          context.buildResponse(abstractResponse)
+      }
+    }
+
+    def responseString(response: AbstractResponse): Option[String] = {
+      if (RequestChannel.isRequestLoggingEnabled)
+        Some(envelopeContext match {
+          case Some(envelopeContext) =>
+            response.toString(envelopeContext.brokerContext.apiVersion)

Review comment:
       Sg, but I guess we need to keep it as is for now to try using the correct api version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-696281629


   @cmccabe Sounds good to me to remove the customized error helper.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r465275440



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##########
@@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) {
     }
 
     public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) {
-        this(new RequestHeaderData().
-                setRequestApiKey(requestApiKey.id).
-                setRequestApiVersion(requestVersion).
-                setClientId(clientId).
-                setCorrelationId(correlationId),
+        this(requestApiKey, requestVersion, clientId, correlationId, null, null);
+    }
+
+    public RequestHeader(ApiKeys requestApiKey,
+                         short requestVersion,
+                         String clientId,
+                         int correlationId,
+                         String initialPrincipalName,
+                         String initialClientId) {

Review comment:
       Actually, we check nulls for these two in `isForwardingRequest` method. I don't feel strongly about this but I usually better to use Optional when such values are not always present.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493520165



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + configName
+              if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) &&
+                configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) {
+                val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//")
+                configProps.setProperty(prefixedName, equivalentFileName)
+                processedFiles.add(prefixedName)
+              }
+            })
+        })
+  }
+
+  private[server] def trimSSLStorePaths(configProps: Properties): Boolean = {
+    var fileChanged = false
+    val processedFiles = new mutable.HashSet[String]
+
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+      .foreach {
+        case reconfigurable: ListenerReconfigurable =>
+        ReloadableFileConfigs.foreach(configName => {
+          val prefixedName = reconfigurable.listenerName.configPrefix + configName
+          if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName)) {
+            val configFileName = configProps.getProperty(prefixedName)
+            val equivalentFileName = configFileName.replace("//", "/")
+            if (!configFileName.equals(equivalentFileName)) {
+              fileChanged = true

Review comment:
       This means update was requested, but not necessarily that file has changed?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={

Review comment:
       nit: `SSL` => `Ssl`

##########
File path: core/src/main/scala/kafka/server/ConfigHandler.scala
##########
@@ -203,7 +203,13 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
     if (brokerId == ConfigEntityName.Default)
       brokerConfig.dynamicConfig.updateDefaultConfig(properties)
     else if (brokerConfig.brokerId == brokerId.trim.toInt) {
-      brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
+      val persistentProps = brokerConfig.dynamicConfig.fromPersistentProps(properties, perBrokerConfig = true)
+      // The filepath was changed for equivalent replacement, which means we should reload
+      if (brokerConfig.dynamicConfig.trimSSLStorePaths(persistentProps)) {
+        brokerConfig.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(persistentProps)
+      }

Review comment:
       Can't we put this logic in `DynamicBrokerConfig`?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + configName
+              if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) &&
+                configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) {
+                val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//")
+                configProps.setProperty(prefixedName, equivalentFileName)
+                processedFiles.add(prefixedName)
+              }
+            })
+        })
+  }
+
+  private[server] def trimSSLStorePaths(configProps: Properties): Boolean = {

Review comment:
       `SSL` => `Ssl`

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + configName
+              if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) &&
+                configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) {
+                val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//")
+                configProps.setProperty(prefixedName, equivalentFileName)
+                processedFiles.add(prefixedName)
+              }
+            })
+        })
+  }
+
+  private[server] def trimSSLStorePaths(configProps: Properties): Boolean = {
+    var fileChanged = false
+    val processedFiles = new mutable.HashSet[String]
+
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+      .foreach {
+        case reconfigurable: ListenerReconfigurable =>
+        ReloadableFileConfigs.foreach(configName => {
+          val prefixedName = reconfigurable.listenerName.configPrefix + configName
+          if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName)) {
+            val configFileName = configProps.getProperty(prefixedName)
+            val equivalentFileName = configFileName.replace("//", "/")
+            if (!configFileName.equals(equivalentFileName)) {
+              fileChanged = true

Review comment:
       This means update was requested, but not necessarily that file has changed?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + configName
+              if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) &&
+                configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) {
+                val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//")

Review comment:
       Does this get reset somewhere or will we keep adding `/`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510536894



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1005,6 +1013,36 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: Option[KafkaPrincipalSerde]) = {
+    val envelopeRequest = context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+    val originalHeader = RequestHeader.parse(envelopeRequest.requestData)
+    // Leave the principal null here is ok since we will fail the request during Kafka API handling.
+    val originalPrincipal = if (principalSerde.isDefined)
+      principalSerde.get.deserialize(envelopeRequest.principalData)
+    else
+      null
+
+    // The forwarding broker and the active controller should have the same DNS resolution, and we need
+    // to have the original client address for authentication purpose.
+    val originalClientAddress = InetAddress.getByName(envelopeRequest.clientHostName)

Review comment:
       I was thinking a little bit about this and trying to decide if the envelope request should have a more literal representation of the client ip address. The way it is working right now, it looks like the following:
   
   1) Use `Socket.getInetAddress` to populate `RequestContext.clientAddress`.
   2) Use `InetAddress.getHostName` to populate the `clientHostName` field in the envelope request. This will do a reverse dns lookup based on the IP address from 1).
   3) Now we send `clientHostName` over the wire. It gets unpacked here by doing a dns lookup to get to the `InetAddress` object.
   
   So it seems we should be skipping the dns translation and just using the IP address from 1). The `InetAddress` class gives us `getAddress` and `getHostAddress`. The first provides the raw byte representation of the ip address, while the latter provides a textual representation. I am thinking we should use `getAddress` and let this field be represented as bytes. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r465272340



##########
File path: clients/src/main/resources/common/message/RequestHeader.json
##########
@@ -37,6 +37,12 @@
     // Since the client is sending the ApiVersionsRequest in order to discover what
     // versions are supported, the client does not know the best version to use.
     { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
-      "flexibleVersions": "none", "about": "The client ID string." }
+      "flexibleVersions": "none", "about": "The client ID string." },
+    { "name": "InitialPrincipalName", "type": "string", "tag": 0, "taggedVersions": "2+",
+      "nullableVersions": "2+", "default": "null", "ignorable": true,
+      "about": "Optional value of the initial principal name when the request is redirected by a broker, for audit logging purpose." },

Review comment:
       Yeah, I was actually thinking about the request log. I thought that it may be useful to print them out there as well: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/RequestChannel.scala#L229.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r516950684



##########
File path: core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
##########
@@ -54,8 +55,15 @@ class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness wit
 
   private def createTestTopicAndCluster(topicPartition: Map[TopicPartition, List[Int]],
                                         authorizer: Option[String] = None): Unit = {
+    val brokerConfigs = (0 until 3).map { node =>

Review comment:
       Seems ok to remove




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r513608319



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),
+            abstractResponse.serializeBody(context.header.apiVersion),

Review comment:
       Hmm.. It looks like we do not serialize the response header, but I think we probably should. Today it only includes the correlationId, but who knows how it will evolve in the future? Since we do serialize the request header, it seems better to be consistent. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-670693346


   I think it would be good to split this PR up a little bit.  It seems like we could have a split like this:
   
   PR 1.  `Add flag to the RequestContext` and `Add initial principal name`
   
   PR 2. Authorization changes for AlterConfigs / IncrementalAlterConfigs, forwarding if required, IBP check, bump RPC versions of AlterConfigs / IncrementalAlterConfigs
   
   PR 3. AdminClient changes in behavior based on versions of AlterConfigs / IncrementalAlterConfigs, AlterConfigsUtil, etc.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r513667101



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def checkForwarding(request: RequestChannel.Request): Unit = {
+    if (!request.header.apiKey.forwardable && request.envelopeContext.isDefined) {

Review comment:
       Unless the internal config is present, I think we should treat the envelope as non-existing. Once we are ready to enable it in the IBP, then we will accept the envelope request even if the local IBP is not high enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r509645115



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int,
                 val context = new RequestContext(header, connectionId, channel.socketAddress,
                   channel.principal, listenerName, securityProtocol,
                   channel.channelMetadataRegistry.clientInformation, isPrivilegedListener)
-                val req = new RequestChannel.Request(processor = id, context = context,
-                  startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
+
+                val principalSerde = Option(channel.principalSerde.orElse(null))

Review comment:
       Had a try but it seems java Optional doesn't have an `asScala` option




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r462738230



##########
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##########
@@ -85,11 +89,12 @@ public ApiKeys apiKey() {
     public RequestHeader makeHeader(short version) {
         short requestApiKey = requestBuilder.apiKey().id;
         return new RequestHeader(
-            new RequestHeaderData().
-                setRequestApiKey(requestApiKey).
-                setRequestApiVersion(version).
-                setClientId(clientId).
-                setCorrelationId(correlationId),
+            new RequestHeaderData()

Review comment:
       You commented on the previous PR about the style here. The reasoning is that this is a more common style than having period at the end in our codebase.

##########
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##########
@@ -151,6 +147,20 @@ public void testDnsLookupFailure() {
         assertFalse(client.ready(new Node(1234, "badhost", 1234), time.milliseconds()));
     }
 
+    @Test
+    public void testIncludeInitialPrincipalNameAndClientIdInHeader() {

Review comment:
       This is the new test, the rest of changes in this file are just side cleanups.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
##########
@@ -64,23 +64,11 @@ public String value() {
 
     public static class Builder extends AbstractRequest.Builder<AlterConfigsRequest> {
 
-        private final AlterConfigsRequestData data = new AlterConfigsRequestData();
+        private final AlterConfigsRequestData data;
 
-        public Builder(Map<ConfigResource, Config> configs, boolean validateOnly) {

Review comment:
       Moved to `AlterConfigsUtil`

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2207,27 +2203,6 @@ void handleFailure(Throwable throwable) {
         return futures;
     }
 
-    private IncrementalAlterConfigsRequestData toIncrementalAlterConfigsRequestData(final Collection<ConfigResource> resources,

Review comment:
       Moved to `AlterConfigsUtil`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r509540617



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,60 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send =
+      if (envelopeContext.isDefined) {

Review comment:
       nit: use `match`

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int,
                 val context = new RequestContext(header, connectionId, channel.socketAddress,
                   channel.principal, listenerName, securityProtocol,
                   channel.channelMetadataRegistry.clientInformation, isPrivilegedListener)
-                val req = new RequestChannel.Request(processor = id, context = context,
-                  startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
+
+                val principalSerde = Option(channel.principalSerde.orElse(null))
+                val req =
+                if (header.apiKey == ApiKeys.ENVELOPE) {

Review comment:
       nit: this is misaligned. It might be better to pull the body here into a separate method (e.g. `parseEnvelopeRequest`)

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+      sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined &&
+      (!request.context.fromPrivilegedListener ||
+      !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME))
+    ) {
+      // If the designated forwarding request is not coming from a privileged listener, or
+      // it fails CLUSTER_ACTION permission, we would fail the authorization.
+      sendErrorResponseMaybeThrottle(request, Errors.BROKER_AUTHORIZATION_FAILURE.exception())

Review comment:
       As mentioned above, you can see the rest of the cases in this class where we check CLUSTER_ACTION and they all return `CLUSTER_AUTHORIZATION_FAILURE`.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -153,7 +177,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
-        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
+        case ApiKeys.CREATE_TOPICS => maybeForward(request, handleCreateTopicsRequest)

Review comment:
       We should have a check at the beginning of `handle` to restrict the "forwardable" APIs. 

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,60 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send =

Review comment:
       nit: add braces to all of these methods. Even though they are not required, braces make it easier to see the scope

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,60 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send =
+      if (envelopeContext.isDefined) {
+        // Right now only the NOT_CONTROLLER error gets handled by the forwarding
+        // broker retry. Other errors should be fatal and propagated to the client.
+        val envelopeError = if (error.equals(Errors.NOT_CONTROLLER))
+          Errors.NOT_CONTROLLER
+        else
+          Errors.NONE
+
+        val envelopeResponse = new EnvelopeResponse(
+          abstractResponse.throttleTimeMs(),
+          abstractResponse.serializeBody(context.header.apiVersion),
+          envelopeError
+        )
+        envelopeContext.get.brokerContext.buildResponse(envelopeResponse)
+      } else
+        context.buildResponse(abstractResponse)
+
+    def responseString(response: AbstractResponse): Option[String] =
+      if (RequestChannel.isRequestLoggingEnabled) {
+        Some(if (envelopeContext.isDefined)
+          response.toString(envelopeContext.get.brokerContext.apiVersion)
+        else
+          response.toString(context.apiVersion))
+      } else
+        None
+
+    def headerForLogging(): RequestHeader =
+      if (envelopeContext.isDefined)
+        envelopeContext.get.brokerContext.header
+      else
+        context.header
+
+    // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.

Review comment:
       nit: can we move this back to where the request parsing logic is. Otherwise it becomes a bit hidden.

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -250,7 +301,9 @@ object RequestChannel extends Logging {
     }
 
     def releaseBuffer(): Unit = {
-      if (buffer != null) {
+      if (envelopeContext.isDefined)

Review comment:
       nit: use `match`

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -338,7 +339,9 @@
     INCONSISTENT_VOTER_SET(94, "Indicates that the either the sender or recipient of a " +
             "voter-only request is not one of the expected voters", InconsistentVoterSetException::new),
     INVALID_UPDATE_VERSION(95, "The given update version was invalid.", InvalidUpdateVersionException::new),
-    FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new);
+    FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new),
+    BROKER_AUTHORIZATION_FAILURE(97, "Authorization failed for the request during forwarding. " +

Review comment:
       Not sure I follow. All current inter-broker APIs are gated by `ClusterAction` and will return `CLUSTER_AUTHORIZATION_FAILURE` if the principal does not have access. There is no distinction between clients and brokers. It's not clear to me why we need something different here.

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -129,6 +137,27 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC
     requestQueue.put(BrokerToControllerQueueItem(request, callback))
     requestThread.wakeup()
   }
+
+  override def forwardRequest(responseToOriginalClient: (RequestChannel.Request, Int =>
+                                AbstractResponse, Option[Send => Unit]) => Unit,
+                              request: RequestChannel.Request,
+                              callback: Option[Send => Unit] = Option.empty): Unit = {
+    val serializedRequestData = request.body[AbstractRequest].serialize(request.header)

Review comment:
       We want to avoid this serialization since it introduces the possibility for the request to be altered by the forwarding broker. The `RequestChannel.Request` object retains the reference to the original buffer, which we can use here, but we need to tell the channel to delay releasing the buffer using `ApiKeys.requiresDelayedAllocation` for all of the "forwardable" APIs.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -352,6 +352,8 @@ object KafkaConfig {
   val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMaxMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
+  private[server] val quorumBasedControllerProp = "quorum.based.controller"

Review comment:
       How about `enable.metadata.quorum`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int,
                 val context = new RequestContext(header, connectionId, channel.socketAddress,
                   channel.principal, listenerName, securityProtocol,
                   channel.channelMetadataRegistry.clientInformation, isPrivilegedListener)
-                val req = new RequestChannel.Request(processor = id, context = context,
-                  startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
+
+                val principalSerde = Option(channel.principalSerde.orElse(null))

Review comment:
       nit: you can just use `channel.principalSerde.asScala`

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1025,6 +1027,7 @@ object KafkaConfig {
       .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc)
       .define(ConnectionSetupTimeoutMsProp, LONG, Defaults.ConnectionSetupTimeoutMs, MEDIUM, ConnectionSetupTimeoutMsDoc)
       .define(ConnectionSetupTimeoutMaxMsProp, LONG, Defaults.ConnectionSetupTimeoutMaxMs, MEDIUM, ConnectionSetupTimeoutMaxMsDoc)
+      .define(quorumBasedControllerProp, BOOLEAN, false, LOW, "Private configuration for turning on/off KIP-500 mode")

Review comment:
       Use `defineInternal`

##########
File path: clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
##########
@@ -167,4 +171,24 @@ public void close() {
             oldPrincipalBuilder.close();
     }
 
+    @Override
+    public ByteBuffer serialize(KafkaPrincipal principal) {
+        DefaultPrincipalData data = new DefaultPrincipalData()
+                                        .setType(principal.getPrincipalType())
+                                        .setName(principal.getName())
+                                        .setTokenAuthenticated(principal.tokenAuthenticated());
+        Struct dataStruct = data.toStruct(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION);
+        ByteBuffer buffer = ByteBuffer.allocate(dataStruct.sizeOf());
+        dataStruct.writeTo(buffer);
+        buffer.flip();
+        return buffer;
+    }
+
+    @Override
+    public KafkaPrincipal deserialize(ByteBuffer bytes) {
+        DefaultPrincipalData data = new DefaultPrincipalData(
+            DefaultPrincipalData.SCHEMAS[DefaultPrincipalData.SCHEMAS.length - 1].read(bytes),
+            DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION);

Review comment:
       Rather than assuming highest supported version, we should include the version in the serialized data. The simple thing would be to write the version first, then write the payload.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+      sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined &&
+      (!request.context.fromPrivilegedListener ||
+      !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME))
+    ) {
+      // If the designated forwarding request is not coming from a privileged listener, or
+      // it fails CLUSTER_ACTION permission, we would fail the authorization.
+      sendErrorResponseMaybeThrottle(request, Errors.BROKER_AUTHORIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined && !controller.isActive) {
+      sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+    } else if (!controller.isActive && couldDoRedirection(request)) {
+      redirectionManager.forwardRequest(sendResponseMaybeThrottle, request)
+    } else {
+      // When IBP is smaller than 2.8 or the principal serde is undefined, forwarding is not supported,
+      // therefore requests are handled directly.
+      handler(request)
+    }
+  }
+
+  private def couldDoRedirection(request: RequestChannel.Request): Boolean =

Review comment:
       We use 'forward' and 'redirect' interchangeably throughout the PR, but the names do suggest different behavior. In my mind 'redirection' suggests that we are telling the client to go somewhere else, while 'forward' suggests that the broker is passing the request through to its destination. So maybe we can stick with 'forward' consistently (e.g. `isForwardingEnabled`)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r505852939



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -251,7 +253,8 @@ public Struct parseResponse(short version, ByteBuffer buffer) {
         DescribeQuorumRequestData.SCHEMAS, DescribeQuorumResponseData.SCHEMAS),
     ALTER_ISR(56, "AlterIsr", AlterIsrRequestData.SCHEMAS, AlterIsrResponseData.SCHEMAS),
     UPDATE_FEATURES(57, "UpdateFeatures",
-        UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS);
+        UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS),
+    ENVELOPE(58, "Envelope", EnvelopeRequestData.SCHEMAS, EnvelopeResponseData.SCHEMAS);

Review comment:
       I think we do have that logic enforced by setting `zeroCopy` to true for request data field in the RPC json.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493752855



##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -547,7 +553,8 @@ class AdminManager(val config: KafkaConfig,
       None
     else {
       val id = resourceNameToBrokerId(resource.name)
-      if (id != this.config.brokerId)
+      // Under redirection, it is possible to handle config changes targeting at brokers other than the controller.

Review comment:
       The logic is needed when there is an AlterConfigRequest targeting at a specific broker. Since the non-controller node will no longer handle AlterConfigs, it is possible to see a redirected changing request with a broker.id different than the controller broker.id.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493160927



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+    public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Map<ConfigResource, Collection<AlterConfigOp>> configs,
+                                                                                    final boolean validateOnly) {
+        return generateIncrementalRequestData(configs.keySet(), configs, validateOnly);
+    }
+
+    public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Collection<ConfigResource> resources,

Review comment:
       nit: might be useful to document the expectation that `resources` is a subset of the key set of `configs`. The signature surprised me a little bit.
   
   As an aside, this kind of convenience conversion seems more appropriate for `IncrementalAlterConfigsRequest.Builder` rather than a static class.

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -103,6 +103,9 @@ object ApiVersion {
     KAFKA_2_7_IV0,
     // Bup Fetch protocol for Raft protocol (KIP-595)
     KAFKA_2_7_IV1,
+    // Enable redirection (KIP-590)
+    // TODO: remove this IBP in the 2.7 release if redirection work could not be done before the freeze

Review comment:
       Get rid of this TODO. We do not need to remove IBP internal versions.

##########
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##########
@@ -147,7 +147,9 @@ abstract class InterBrokerSendThread(name: String,
 
 case class RequestAndCompletionHandler(destination: Node,
                                        request: AbstractRequest.Builder[_ <: AbstractRequest],
-                                       handler: RequestCompletionHandler)
+                                       handler: RequestCompletionHandler,
+                                       initialPrincipalName: String = null,

Review comment:
       nit: why don't we add a case class and make this optional. for example:
   
   ```scala
   case class InitialPrincipal(name: String, clientId: String)
   ```
   In addition to reducing parameters, that makes the expectation that both are provided explicit.
   

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),

Review comment:
       In general, the forwarded request may have a different version than the client request. I'm wondering if we should keep the version the same in case there are semantic differences. As an example, a newer version of the API may introduce unexpected error codes. Unless we have logic to convert those error codes, then we might break compatibility unexpectedly.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -459,7 +459,10 @@ class AclAuthorizer extends Authorizer with Logging {
       val apiKey = if (ApiKeys.hasId(requestContext.requestType)) ApiKeys.forId(requestContext.requestType).name else requestContext.requestType
       val refCount = action.resourceReferenceCount
 
-      s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource for request = $apiKey with resourceRefCount = $refCount"
+      val initialPrincipalName = requestContext.initialPrincipalName
+      val initialPrincipalMessage = if(initialPrincipalName != null) s", on behalf of initial principal =$initialPrincipalName," else ""

Review comment:
       nit: space after `if`

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -547,7 +553,8 @@ class AdminManager(val config: KafkaConfig,
       None
     else {
       val id = resourceNameToBrokerId(resource.name)
-      if (id != this.config.brokerId)
+      // Under redirection, it is possible to handle config changes targeting at brokers other than the controller.

Review comment:
       The comment doesn't seem to make sense here. Seems like the logic doesn't have anything to do with the controller?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {

Review comment:
       nit: this is misaligned

##########
File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -117,10 +119,26 @@ class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache
                                   callback: RequestCompletionHandler): Unit = {
     requestQueue.put(BrokerToControllerQueueItem(request, callback))
   }
+
+  private[server] def forwardRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest],
+                                     responseToOriginalClient: (RequestChannel.Request, Int => AbstractResponse,

Review comment:
       This function has 3 callbacks... It would be nice if we could figure out how to pass through the `ForwardRequestHandler` directly.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3064,12 +3272,33 @@ class KafkaApis(val requestChannel: RequestChannel,
                                 logIfDenied: Boolean = true,
                                 refCount: Int = 1): Boolean = {
     authorizer.forall { authZ =>
-      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
-      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
-      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+      if (authorizeAction(requestContext, operation,
+        resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)) {
+        true
+      } else {
+        operation match {
+          case ALTER | ALTER_CONFIGS | CREATE | DELETE =>

Review comment:
       It would be helpful to have a comment explaining this. It does not seem obvious.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -273,31 +275,632 @@ class KafkaApisTest {
       .setIncludeSynonyms(true)
       .setResources(List(new DescribeConfigsRequestData.DescribeConfigsResource()
         .setResourceName("topic-1")
-        .setResourceType(ConfigResource.Type.TOPIC.id)).asJava)).build(requestHeader.apiVersion))
+        .setResourceType(ConfigResource.Type.TOPIC.id)).asJava))
+      .build(requestHeader.apiVersion),
+      requestHeader = Option(requestHeader))
     createKafkaApis(authorizer = Some(authorizer)).handleDescribeConfigsRequest(request)
 
     verify(authorizer, adminManager)
   }
 
+  @Test
+  def testAlterClientQuotasWithAuthorizer(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val request = buildRequest(new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    expectNoThrottling()
+
+    EasyMock.expect(adminManager.alterClientQuotas(anyObject(), EasyMock.eq(false)))
+      .andReturn(Map(quotaEntity -> ApiError.NONE))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasWithNonControllerAndRedirectionDisabled(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest)
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.expect(adminManager.alterClientQuotas(anyObject(), EasyMock.eq(false)))
+      .andReturn(Map(quotaEntity -> ApiError.NONE))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    // Should just handle the config change since IBP is low
+    createKafkaApis(interBrokerProtocolVersion = KAFKA_2_6_IV0,
+      authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map(quotaEntity -> Errors.NONE))
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasWithRedirection(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0)
+
+    val request = buildRequest(new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    expectNoThrottling()
+
+    val redirectRequestBuilder = new AlterClientQuotasRequest.Builder(
+      Set(new ClientQuotaAlteration(quotaEntity, Collections.emptySet())).asJava, false)
+
+    val capturedCallback = EasyMock.newCapture[ClientResponse => AbstractResponse]()
+
+    EasyMock.expect(redirectionManager.forwardRequest(
+      EasyMock.eq(redirectRequestBuilder),
+      anyObject[(RequestChannel.Request, Int => AbstractResponse,
+        Option[Send => Unit]) => Unit](),
+      EasyMock.eq(request),
+      EasyMock.capture(capturedCallback),
+      anyObject()
+    )).once()
+
+    val clientResponse: ClientResponse = EasyMock.createNiceMock(classOf[ClientResponse])
+    val alterClientQuotasResponse = new AlterClientQuotasResponse(
+      Map(quotaEntity -> ApiError.NONE).asJava, 10
+    )
+    EasyMock.expect(clientResponse.responseBody).andReturn(alterClientQuotasResponse)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel,
+      authorizer, controller, redirectionManager, clientResponse)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    assertEquals(alterClientQuotasResponse, capturedCallback.getValue.apply(clientResponse))
+
+    EasyMock.verify(controller, redirectionManager)
+  }
+
+  @Test
+  def testAlterClientQuotasAsForwardingRequestWithNonController(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    // Include extra header fields for forwarding request check
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0, "initial-principal", "initial-client")
+
+    val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest,
+      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+    EasyMock.expect(controller.isActive).andReturn(false)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map(quotaEntity -> Errors.NOT_CONTROLLER))
+
+    verify(authorizer, adminManager)
+  }
+
+  @Test
+  def testAlterClientQuotasAsForwardingRequest(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
+    // As a forwarding request, we would use CLUSTER_ACTION to do a separate round of auth.
+    authorizeResource(authorizer, AclOperation.CLUSTER_ACTION, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.DENIED)
+
+    val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
+    val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))
+
+    // Include extra header fields for forwarding request check
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion,
+      clientId, 0, "initial-principal", "initial-client")
+
+    val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(alterClientQuotasRequest,
+      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    val capturedResponse = expectNoThrottling()
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
+      adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleAlterClientQuotasRequest(request)
+
+    verifyAlterClientQuotaResult(alterClientQuotasRequest,
+      capturedResponse, Map( quotaEntity -> Errors.BROKER_AUTHORIZATION_FAILURE))
+
+    verify(authorizer, adminManager)
+  }
+
+  private def verifyAlterClientQuotaResult(alterClientQuotasRequest: AlterClientQuotasRequest,
+                                           capturedResponse: Capture[RequestChannel.Response],
+                                           expected: Map[ClientQuotaEntity, Errors]): Unit = {
+    val response = readResponse(ApiKeys.ALTER_CLIENT_QUOTAS, alterClientQuotasRequest, capturedResponse)
+      .asInstanceOf[AlterClientQuotasResponse]
+    val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap
+    response.complete(futures.asJava)
+    futures.foreach {
+      case (entity, future) =>
+        future.whenComplete((_, thrown) =>
+          assertEquals(thrown, expected(entity).exception())
+        ).isDone
+    }
+  }
+
+  @Test
+  def testCreateTopicsWithAuthorizer(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    val operation = AclOperation.CREATE
+    val topicName = "topic-1"
+    val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, ApiKeys.CREATE_TOPICS.latestVersion,
+      clientId, 0)
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    authorizeResource(authorizer, operation, ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED, logIfDenied = false)
+
+    authorizeResource(authorizer, AclOperation.DESCRIBE_CONFIGS, ResourceType.TOPIC,
+      topicName, AuthorizationResult.ALLOWED, logIfDenied = false)
+
+    expectNoThrottling()
+
+    val topicsAuthorized = new CreateTopicsRequestData.CreatableTopicCollection(1)
+    val topicToCreate = new CreateTopicsRequestData.CreatableTopic()
+      .setName(topicName)
+    topicsAuthorized.add(topicToCreate)
+
+    val timeout = 10
+    val request = buildRequest(new CreateTopicsRequest.Builder(new CreateTopicsRequestData()
+      .setTimeoutMs(timeout)
+      .setValidateOnly(false)
+      .setTopics(topicsAuthorized))
+      .build(requestHeader.apiVersion))
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.eq(request), EasyMock.eq(6))).andReturn(UnboundedControllerMutationQuota)
+
+    EasyMock.expect(adminManager.createTopics(
+      EasyMock.eq(timeout),
+      EasyMock.eq(false),
+      EasyMock.eq(Map(topicName -> topicToCreate)),
+      anyObject(),
+      EasyMock.eq(UnboundedControllerMutationQuota),
+      anyObject()))
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, authorizer, adminManager, controller)
+
+    createKafkaApis(authorizer = Some(authorizer)).handleCreateTopicsRequest(request)
+
+    verify(authorizer, adminManager, clientControllerQuotaManager)
+  }
+
+  @Test
+  def testCreateTopicsWithNonControllerAndRedirectionDisabled(): Unit = {

Review comment:
       Good to see the unit tests in here. I think we also need at least a couple integration tests. For example, could we add something to `CreateTopicsRequestTest` to ensure that forwarding works as expected?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1733,68 +1817,109 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None)
     }
 
-    val createTopicsRequest = request.body[CreateTopicsRequest]
-    val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
-      }
-      val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
-      val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
+    val forwardRequestHandler = new ForwardRequestHandler[CreateTopicsRequest,
+      CreateTopicsResponse, String, CreatableTopic](request) {
 
-      results.forEach { topic =>
-        if (results.findAll(topic.name).size > 1) {
-          topic.setErrorCode(Errors.INVALID_REQUEST.code)
-          topic.setErrorMessage("Found multiple entries for this topic.")
-        } else if (!authorizedTopics.contains(topic.name)) {
-          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-          topic.setErrorMessage("Authorization failed.")
-        }
-        if (!authorizedForDescribeConfigs.contains(topic.name)) {
-          topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+      override def resourceSplitByAuthorization(createTopicsRequest: CreateTopicsRequest):
+      (Map[String, CreatableTopic], Map[String, ApiError]) = {

Review comment:
       nit: this is subjective, but this style is a bit ugly. I would prefer the following:
   ```scala
   override def resourceSplitByAuthorization(
     createTopicsRequest: CreateTopicsRequest
   ): (Map[String, CreatableTopic], Map[String, ApiError]) = {
   ```
   That makes it easier visually to separate the return type and the function logic (again, in my opinion).

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {
+      val requestBody = request.body[AbstractRequest].asInstanceOf[T]
+      val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody)
+      if (isForwardingRequest(request)) {
+        if (!controller.isActive) {
+          sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+          } else {
+            // For forwarding requests, the authentication failure is not caused by
+            // the original client, but by the broker.
+            val unauthorizedResult = unauthorizedResources.keys.map {
+              resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+            }.toMap
+
+            process(authorizedResources, unauthorizedResult, requestBody)
+          }
+      } else if (!controller.isActive && config.redirectionEnabled &&
+        authorizedResources.nonEmpty) {
+        redirectionManager.forwardRequest(
+          createRequestBuilder(authorizedResources, requestBody),
+          sendResponseMaybeThrottle,
+          request,
+          response => {
+            mergeResponse(response.responseBody.asInstanceOf[R], unauthorizedResources)
+          })
+      } else {
+        // When IBP is smaller than 2.7, forwarding is not supported,
+        // therefore requests are handled directly
+        process(authorizedResources, unauthorizedResources, requestBody)

Review comment:
       We can't guarantee that this broker will still be the controller when we call `process` or that the broker we're forwarding to will still be the controller when it receives the request. In these cases, we need to return some retriable error to the client. Can you help me understand how this is implemented?

##########
File path: core/src/main/scala/kafka/server/AdminManager.scala
##########
@@ -513,15 +513,21 @@ class AdminManager(val config: KafkaConfig,
     resource -> ApiError.NONE
   }
 
-  private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean,
-                                 configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
+  private def alterBrokerConfigs(resource: ConfigResource,
+                                 validateOnly: Boolean,
+                                 configProps: Properties,
+                                 configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
     val brokerId = getBrokerId(resource)
     val perBrokerConfig = brokerId.nonEmpty
     this.config.dynamicConfig.validate(configProps, perBrokerConfig)
     validateConfigPolicy(resource, configEntriesMap)
     if (!validateOnly) {
-      if (perBrokerConfig)
+      if (perBrokerConfig) {
+        val previousConfigProps = config.dynamicConfig.currentDynamicBrokerConfigs
         this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
+        this.config.dynamicConfig.maybeAugmentSSLStorePaths(configProps, previousConfigProps)

Review comment:
       Can you explain why this change is needed?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+    R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging {
+
+    /**
+     * Split the given resource into authorized and unauthorized sets.
+     *
+     * @return authorized resources and unauthorized resources
+     */
+    def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError])
+
+    /**
+     * Controller handling logic of the request.
+     */
+    def process(authorizedResources: Map[RK, RV],
+                unauthorizedResult: Map[RK, ApiError],
+                request: T): Unit
+
+    /**
+     * Build a forward request to the controller.
+     *
+     * @param authorizedResources authorized resources by the forwarding broker
+     * @param request the original request
+     * @return forward request builder
+     */
+    def createRequestBuilder(authorizedResources: Map[RK, RV],
+                             request: T): AbstractRequest.Builder[T]
+
+    /**
+     * Merge the forward response with the previously unauthorized results.
+     *
+     * @param forwardResponse the forward request's response
+     * @param unauthorizedResult original unauthorized results
+     * @return combined response to the original client
+     */
+    def mergeResponse(forwardResponse: R,
+                      unauthorizedResult: Map[RK, ApiError]): R
+
+    def handle(): Unit = {

Review comment:
       nit: seems `handle` doesn't really need to be part of `ForwardRequestHandler`. Instead we could pull it out:
   ```scala
   private def handle(handler: ForwardRequestHandler): Unit = {
   ...
   ```
   The advantage of this is that it allows us to pull the type out of `KafkaApis` without inheriting all of the dependencies that are needed by `handle`.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsResponse.java
##########
@@ -25,23 +25,35 @@
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class IncrementalAlterConfigsResponse extends AbstractResponse {
 
-    public static IncrementalAlterConfigsResponseData toResponseData(final int requestThrottleMs,
-                                                                     final Map<ConfigResource, ApiError> results) {
-        IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData();
-        responseData.setThrottleTimeMs(requestThrottleMs);
-        for (Map.Entry<ConfigResource, ApiError> entry : results.entrySet()) {
-            responseData.responses().add(new AlterConfigsResourceResponse().
-                    setResourceName(entry.getKey().name()).
-                    setResourceType(entry.getKey().type().id()).
-                    setErrorCode(entry.getValue().error().code()).
-                    setErrorMessage(entry.getValue().message()));
-        }
-        return responseData;
+    public IncrementalAlterConfigsResponse(final int requestThrottleMs,
+                                           final Map<ConfigResource, ApiError> results) {
+        this.data = new IncrementalAlterConfigsResponseData()
+                        .setThrottleTimeMs(requestThrottleMs);
+
+        addResults(results);
+    }
+
+    public IncrementalAlterConfigsResponse addResults(final Map<ConfigResource, ApiError> results) {

Review comment:
       Typically responses are immutable after construction. It seems kind of a brittle pattern to rely on being able to mutate the response we receive from the other broker. For example we inherit the throttle time which is a bit weird. Are we saving that much by not creating a new response?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r511872018



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),

Review comment:
       I guess the only quota that is affected for the RPCs we currently forward is request quotas. Totally agree that we shouldn't throttle inter-broker connections. There are a few other things to consider here:
   
   1) Every forwarded request uses network thread and request handler time on two brokers. Are we saying that we can ignore the time spent on the forwarding broker because that is negligible? In a deployment with SSL on the external listener and PLAINTEXT on the inter-broker listener, there may be more network thread time used on the forwarding broker rather than the controller. Do we record these, but use the controller throttle time for throttling?
   2)  Are we changing the semantics of quotas? For example, if a client sends a request1 to leastLoadedNode A which mutes the connection and then sends request2 to leastLoadedNode B that happens to be the controller, we would mute that connection too. Another client with the same principal would get muted on B, but not A because A's quota hasn't been violated. I think this should be ok, though a bit confusing.
   3) Are these measures good enough to protect the controller? This is the one that needs some more thought. Request quotas are configured to allocate a percentage of thread usage to each principal. Our quotas aren't very good at protecting against DOS attacks, but they help to limit usage for normal clients using the APIs. So if we can make sure the implementation for forwarded requests can handle this case, it would be good enough. In the old world, a client doing a lot of config updates would have just distributed the load across brokers as each node was throttled. Now, we distribute the iniital request across brokers as controller decides to throttle. Total rate for these requests across the cluster is dramatically reduced because all load is now on the controller. But from the controller broker's point of view, we are now allowing more requests through for the same quota from every client because a client can forward through `n` brokers. @apovzner may have more context on wheth
 er these request types actually hit request quotas in real deployments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r476860796



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
##########
@@ -87,6 +87,16 @@ public Builder(Map<ConfigResource, Config> configs, boolean validateOnly) {
         public AlterConfigsRequest build(short version) {
             return new AlterConfigsRequest(data, version);
         }
+

Review comment:
       The purpose is for the mock tests to compare the expected builder in `KafkaApisTest`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r475710400



##########
File path: clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationFailureException.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Exception used to indicate a broker side authorization failure during request redirection.
+ */
+public class BrokerAuthorizationFailureException extends AuthorizationException {
+

Review comment:
       Need to include:
   ```
   private static final long serialVersionUID = 1L;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r475725389



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -316,7 +316,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, threadNamePrefix)
         kafkaController.startup()
 
-        brokerToControllerChannelManager = new BrokerToControllerChannelManager(metadataCache, time, metrics, config, threadNamePrefix)
+        if (config.redirectionEnabled) {
+          brokerToControllerChannelManager = new BrokerToControllerChannelManager(metadataCache, time, metrics, config, threadNamePrefix)

Review comment:
       just as a note the alter isr PR may also have an object like this.  so maybe we want a name which is more specific to redirection.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-696225206


   Hi @abbccdda ,
   
   Thanks for the PR!  It looks good.  I like the idea behind `ForwardRequestHandler`.
   
   Since this class doesn't have any internal state, I wonder if it would be more Scala-like to just have a function which just takes some callbacks as arguments.
   
   Can we get rid of the need for the `customizedAuthorizationError` callback by having `resourceSplitByAuthorization` return a `Map[RK, ApiError]` instead of `Map[RK, RV]`?  When would RV be needed for keys where authorization failed?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-696281629


   @cmccabe Sounds good to me to remove the customized error helper.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493771144



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + configName
+              if (!processedFiles.contains(prefixedName) && configProps.containsKey(prefixedName) &&
+                configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName, ""))) {
+                val equivalentFileName = configProps.getProperty(prefixedName).replace("/", "//")

Review comment:
       Yes, we would trim it in `trimSslStorePaths`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r513613435



##########
File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose.

Review comment:
       nit: for the the purpose of inter-broker forwarding

##########
File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose.
+ */
+public interface KafkaPrincipalSerde {

Review comment:
       Since this is a public API, it's worth documenting that these apis should raise a consistent error, such as `SerializationException`, in case of an error.

##########
File path: clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
##########
@@ -59,7 +59,8 @@ public void testResponseThrottleTime() {
         for (ApiKeys apiKey: ApiKeys.values()) {
             Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion());
             BoundField throttleTimeField = responseSchema.get(CommonFields.THROTTLE_TIME_MS.name);
-            if (apiKey.clusterAction || authenticationKeys.contains(apiKey))
+            // Envelope could be throttled, even though it requires cluster action.
+            if (apiKey != ApiKeys.ENVELOPE && (apiKey.clusterAction || authenticationKeys.contains(apiKey)))

Review comment:
       I guess this shows an inconsistency between the envelope and the other inter-broker APIs. The throttle time field is only useful if we actually expect the forwarding broker to respect it and backoff. I wonder if we should just be consistent for now and leave this out. 

##########
File path: clients/src/main/resources/common/message/RequestHeader.json
##########
@@ -37,12 +37,6 @@
     // Since the client is sending the ApiVersionsRequest in order to discover what
     // versions are supported, the client does not know the best version to use.
     { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
-      "flexibleVersions": "none", "about": "The client ID string." },
-    { "name": "InitialPrincipalName", "type": "string", "tag": 0, "taggedVersions": "2+",

Review comment:
       It looks like these changes made it to 2.7. We need to revert them before the release or it will not be safe to remove them. The danger is that we might use these tag ids for another purpose in the future, which will break the request parsing.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal);
+
+    KafkaPrincipal deserialize(ByteBuffer bytes);

Review comment:
       Since principals should be small, it is tempting to just use simple byte arrays for this interface. This is typically simpler for users and gives us a stronger boundary between plugin and broker code. 

##########
File path: clients/src/main/resources/common/message/DefaultPrincipalData.json
##########
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "DefaultPrincipalData",
+  // The encoding format for default Kafka principal.
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    {"name": "Type", "type": "string", "versions": "0+",
+      "about": "The principal type"},
+    {"name": "Name", "type": "string", "versions": "0+",
+      "about": "The principal name"},
+    {"name": "tokenAuthenticated", "type": "bool", "versions": "0+",
+      "about": "Whether the given principal is token authenticated."}

Review comment:
       Perhaps add a little more detail?
   
   > Whether the principal was authenticated by a delegation token on the forwarding broker

##########
File path: clients/src/main/resources/common/message/DefaultPrincipalData.json
##########
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "DefaultPrincipalData",
+  // The encoding format for default Kafka principal.

Review comment:
       Might be worth mentioning `org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder` explicitly.

##########
File path: clients/src/main/resources/common/message/DefaultPrincipalData.json
##########
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "DefaultPrincipalData",
+  // The encoding format for default Kafka principal.
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    {"name": "Type", "type": "string", "versions": "0+",
+      "about": "The principal type"},
+    {"name": "Name", "type": "string", "versions": "0+",
+      "about": "The principal name"},
+    {"name": "tokenAuthenticated", "type": "bool", "versions": "0+",

Review comment:
       nit: use upper-case `TokenAuthenticated` for consistency with other fields

##########
File path: clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
##########
@@ -34,6 +34,11 @@ public void testDelayedAllocationSchemaDetection() throws Exception {
                 case EXPIRE_DELEGATION_TOKEN:
                 case RENEW_DELEGATION_TOKEN:
                 case ALTER_USER_SCRAM_CREDENTIALS:
+                case ENVELOPE:
+                case ALTER_CONFIGS:
+                case INCREMENTAL_ALTER_CONFIGS:
+                case ALTER_CLIENT_QUOTAS:

Review comment:
       Would it make sense to add a default rule? If the api is forwardable, then we can assert it requires delayed deallocation.

##########
File path: clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
##########
@@ -167,4 +171,26 @@ public void close() {
             oldPrincipalBuilder.close();
     }
 
+    @Override
+    public ByteBuffer serialize(KafkaPrincipal principal) {
+        DefaultPrincipalData data = new DefaultPrincipalData()
+                                        .setType(principal.getPrincipalType())
+                                        .setName(principal.getName())
+                                        .setTokenAuthenticated(principal.tokenAuthenticated());
+        Struct dataStruct = data.toStruct(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION);
+        ByteBuffer buffer = ByteBuffer.allocate(2 + dataStruct.sizeOf());
+        buffer.putShort(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION);
+        dataStruct.writeTo(buffer);
+        buffer.flip();
+        return buffer;
+    }
+
+    @Override
+    public KafkaPrincipal deserialize(ByteBuffer bytes) {
+        short version = bytes.getShort();
+        DefaultPrincipalData data = new DefaultPrincipalData(
+            DefaultPrincipalData.SCHEMAS[version].read(bytes),

Review comment:
       We may as well add a check here for the version so that we get a useful error in case we receive a version that we do not support.

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -104,7 +108,9 @@ object ApiVersion {
     // Bup Fetch protocol for Raft protocol (KIP-595)
     KAFKA_2_7_IV1,
     // Introduced AlterIsr (KIP-497)
-    KAFKA_2_7_IV2
+    KAFKA_2_7_IV2,
+    // Introduced IBP based constraints for ApiVersion (KIP-590)

Review comment:
       It's not clear to me why we need to do this now since we are not enabling forwarding yet.

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),
+            abstractResponse.serializeBody(context.header.apiVersion),

Review comment:
       In fact, the schema doc says that the response header should be included.

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),
+            abstractResponse.serializeBody(context.header.apiVersion),
+            error
+          )
+
+          envelopeContext.brokerContext.buildResponse(envelopeResponse)
+        case None =>
+          context.buildResponse(abstractResponse)
+      }
+    }
+
+    def responseString(response: AbstractResponse): Option[String] = {
+      if (RequestChannel.isRequestLoggingEnabled)
+        Some(envelopeContext match {
+          case Some(envelopeContext) =>
+            response.toString(envelopeContext.brokerContext.apiVersion)

Review comment:
       Hmm.. The request logging will not be too useful if we cannot see what is in the embedded request and response. I think we should print the envelope structures separately. Longer term, we should figure out how to incorporate the envelope into https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema. 

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -153,7 +177,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
-        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
+        case ApiKeys.CREATE_TOPICS => maybeForward(request, handleCreateTopicsRequest)

Review comment:
       Not sure why this was resolved. I don't see the check. Basically the first thing we should do in `handle` is check whether we have an envelope request and if it is authorized.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-722005317


   Mvn failure is not related, merging


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510568706



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def checkForwarding(request: RequestChannel.Request): Unit = {
+    if (!request.header.apiKey.forwardable && request.envelopeContext.isDefined) {
+      throw new IllegalStateException("Given RPC " + request.header.apiKey + " does not support forwarding.")
+    }
+  }
+
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+      sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined &&
+      (!request.context.fromPrivilegedListener ||
+      !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME))
+    ) {
+      // If the designated forwarding request is not coming from a privileged listener, or
+      // it fails CLUSTER_ACTION permission, we would fail the authorization.
+      sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception())

Review comment:
       The question would be how the forwarding broker should do the error handling for auth & principal serde exceptions. To me we should get a vanilla error response with `UNKNOWN_SERVER_ERROR` and get back to the original client? Besides that, I think we could add a differentiation here to avoid passing the serde-type errors to the client.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r465273903



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()

Review comment:
       Ack. I have missed the handling of `NOT_CONTROLLER` in the `BrokerToControllerChannelManager`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r466714423



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2982,12 +3089,33 @@ class KafkaApis(val requestChannel: RequestChannel,
                                 logIfDenied: Boolean = true,
                                 refCount: Int = 1): Boolean = {
     authorizer.forall { authZ =>
-      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
-      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
-      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+      if (authorizeAction(requestContext, operation,
+        resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)) {
+        true
+      } else {
+        operation match {
+          case ALTER | ALTER_CONFIGS | CREATE | DELETE =>
+            requestContext.maybeFromControlPlane &&
+              authorizeAction(requestContext, CLUSTER_ACTION,
+                resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)

Review comment:
       Actually I think you are right, will change here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r465274018



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2599,13 +2664,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources, alterConfigsRequest.data.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new IncrementalAlterConfigsResponse(requestThrottleMs, results.asJava))
+    }
+
+    def notControllerResponse(): Unit = {
+      val errorResult = configs.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {

Review comment:
       Ack. This is what I thought.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-693757638


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-713244123


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r511012913



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1005,6 +1013,36 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: Option[KafkaPrincipalSerde]) = {
+    val envelopeRequest = context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+    val originalHeader = RequestHeader.parse(envelopeRequest.requestData)
+    // Leave the principal null here is ok since we will fail the request during Kafka API handling.
+    val originalPrincipal = if (principalSerde.isDefined)
+      principalSerde.get.deserialize(envelopeRequest.principalData)
+    else
+      null
+
+    // The forwarding broker and the active controller should have the same DNS resolution, and we need
+    // to have the original client address for authentication purpose.
+    val originalClientAddress = InetAddress.getByName(envelopeRequest.clientHostName)

Review comment:
       So the proposal is simply for saving the unnecessary dns translation? Not sure if representing as bytes would also serve the security purpose as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r465273597



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2453,34 +2455,98 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
     val alterConfigsRequest = request.body[AlterConfigsRequest]
-    val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
+    val requestResources = alterConfigsRequest.configs.asScala.toMap
+
+    val (authorizedResources, unauthorizedResources) = requestResources.partition { case (resource, _) =>
       resource.`type` match {
         case ConfigResource.Type.BROKER_LOGGER =>
-          throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
+          throw new InvalidRequestException(
+            s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
         case ConfigResource.Type.BROKER =>
           authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
           authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
         case rt => throw new InvalidRequestException(s"Unexpected resource type $rt")
       }
     }
-    val authorizedResult = adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+
+    def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+      sendResponseMaybeThrottle(request, requestThrottleMs =>
+        new AlterConfigsResponse(results.asJava, requestThrottleMs))
     }
-    def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
-      val data = new AlterConfigsResponseData()
-        .setThrottleTimeMs(requestThrottleMs)
-      (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) =>
-        data.responses().add(new AlterConfigsResourceResponse()
-          .setErrorCode(error.error.code)
-          .setErrorMessage(error.message)
-          .setResourceName(resource.name)
-          .setResourceType(resource.`type`.id))
+
+    def notControllerResponse(): Unit = {
+      val errorResult = requestResources.keys.map {
+        resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+      }.toMap
+
+      sendResponseCallback(errorResult)
+    }
+
+    if (isForwardingRequest(request)) {
+      if (!controller.isActive) {
+        notControllerResponse()
+      } else {
+        val authorizedResult = adminManager.alterConfigs(
+          authorizedResources, alterConfigsRequest.validateOnly)
+        // For forwarding requests, the authentication failure is not caused by
+        // the original client, but by the broker.
+        val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+          resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+        }
+
+        sendResponseCallback(authorizedResult ++ unauthorizedResult)
+      }
+    } else if (!controller.isActive && config.redirectionEnabled) {
+      val redirectRequestBuilder = new AlterConfigsRequest.Builder(
+        authorizedResources.asJava, alterConfigsRequest.validateOnly())
+
+      brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+        new ForwardedAlterConfigsRequestCompletionHandler(request,
+          unauthorizedResources.keys.map { resource =>
+            resource -> configsAuthorizationApiError(resource)
+          }.toMap),
+        request.header.initialPrincipalName,
+        request.header.initialClientId)
+    } else {
+      // When IBP is low, we would just handle the config request, as admin client doesn't know
+      // how to find the controller.
+      val authorizedResult = adminManager.alterConfigs(
+        authorizedResources, alterConfigsRequest.validateOnly)
+      val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+        resource -> configsAuthorizationApiError(resource)
       }
-      new AlterConfigsResponse(data)
+
+      sendResponseCallback(authorizedResult ++ unauthorizedResult)
+    }
+  }
+
+  private def isForwardingRequest(request: RequestChannel.Request): Boolean = {
+    request.header.initialPrincipalName != null &&
+      request.header.initialClientId != null &&
+      request.context.fromControlPlane

Review comment:
       Sorry, I was not clear. If the control plane listener is not configured, control requests will go to the data plane listener. Based on your last commits, it seems that you have figured that out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda closed pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda closed pull request #9103:
URL: https://github.com/apache/kafka/pull/9103


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r489875283



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1542,6 +1542,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   /** ********* Feature configuration ***********/
   def isFeatureVersioningEnabled = interBrokerProtocolVersion >= KAFKA_2_7_IV0
 
+  /** ********* Redirection configuration ***********/
+  def redirectionEnabled = interBrokerProtocolVersion >= KAFKA_2_7_IV1

Review comment:
       I'm still trying to decide how to make sure we could turn off the redirection in 2.7. Having a separate IBP for 3.0 may not work. @cmccabe 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r504408633



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -43,15 +43,43 @@
  */
 public class ApiVersionsResponse extends AbstractResponse {
 
+    public static final int MIN_CONSTRAINT_IBP_VERSION = 31;

Review comment:
       The tricky thing here is that if we handle the api version constraints on the broker side, it means we need to either make changes directly to the returned ApiVersionsResponse or spawn a new instance with applied constraints. That means leaking of the internal architecture of ApiVersionsResponse to the broker level and redundant conversions IMHO. The current approach makes sure the broker level logic is clean with only the necessity of passing the IBP number. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r465908873



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2982,12 +3089,33 @@ class KafkaApis(val requestChannel: RequestChannel,
                                 logIfDenied: Boolean = true,
                                 refCount: Int = 1): Boolean = {
     authorizer.forall { authZ =>
-      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
-      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
-      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+      if (authorizeAction(requestContext, operation,
+        resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)) {
+        true
+      } else {
+        operation match {
+          case ALTER | ALTER_CONFIGS | CREATE | DELETE =>
+            requestContext.maybeFromControlPlane &&
+              authorizeAction(requestContext, CLUSTER_ACTION,
+                resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)

Review comment:
       I wonder if this is correct. Usually, we use `CLUSTER_ACTION` action with the `CLUSTER` resource. For instance, this is how we authorize control requests:
   ```
   authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)
   ```
   I thought that we would do the same in this case. Don't we?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r504345664



##########
File path: clients/src/main/resources/common/message/EnvelopeRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 58,
+  "type": "request",
+  "name": "EnvelopeRequest",
+  // Request struct for redirection.
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "RequestData", "type": "bytes", "versions": "0+", "zeroCopy": true,
+      "about": "The embedded request header and data."},
+    { "name": "PrincipalIdToken", "type": "bytes", "tag": 0, "taggedVersions": "0+" },

Review comment:
       Not yet, could be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r466582281



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2982,12 +3089,33 @@ class KafkaApis(val requestChannel: RequestChannel,
                                 logIfDenied: Boolean = true,
                                 refCount: Int = 1): Boolean = {
     authorizer.forall { authZ =>
-      val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
-      val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
-      authZ.authorize(requestContext, actions).get(0) == AuthorizationResult.ALLOWED
+      if (authorizeAction(requestContext, operation,
+        resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)) {
+        true
+      } else {
+        operation match {
+          case ALTER | ALTER_CONFIGS | CREATE | DELETE =>
+            requestContext.maybeFromControlPlane &&
+              authorizeAction(requestContext, CLUSTER_ACTION,
+                resourceType, resourceName, logIfAllowed, logIfDenied, refCount, authZ)

Review comment:
       I'm not sure either, cc @rajinisivaram @cmccabe 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493209974



##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -103,6 +103,9 @@ object ApiVersion {
     KAFKA_2_7_IV0,
     // Bup Fetch protocol for Raft protocol (KIP-595)
     KAFKA_2_7_IV1,
+    // Enable redirection (KIP-590)
+    // TODO: remove this IBP in the 2.7 release if redirection work could not be done before the freeze

Review comment:
       But in case we release AK 2.7, wouldn't this flag give user the confidence to upgrade to, which we don't want to happen?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #9103:
URL: https://github.com/apache/kafka/pull/9103


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r475712573



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -325,7 +326,9 @@
     UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new),
     THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", ThrottlingQuotaExceededException::new),
     PRODUCER_FENCED(90, "There is a newer producer with the same transactionalId " +
-            "which fences the current one.", ProducerFencedException::new);
+            "which fences the current one.", ProducerFencedException::new),
+    BROKER_AUTHORIZATION_FAILURE(91, "Authorization failed for the request during forwarding. " +

Review comment:
       How about: "A broker failed to authorize itself to another component of the system.  This indicates an internal error on the broker cluster security setup".
   
   This isn't specific to forwarding... there might be other reasons why a broker would need to authorize itself and fail




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r516951766



##########
File path: core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationWithForwardingIntegrationTest.scala
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.Properties
+
+import org.junit.Test
+
+/**
+ * Integration test suite for forwarding mechanism applied on AlterConfigs.
+ * This class basically reused everything from {@link DynamicBrokerReconfigurationTest}
+ * with the KIP-500 mode enabled for sasl listener alter test.
+ */
+class DynamicBrokerReconfigurationWithForwardingIntegrationTest extends DynamicBrokerReconfigurationTest {

Review comment:
       Yea, that's weird, let's move to the next PR for a discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r505842904



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -338,7 +339,9 @@
     INCONSISTENT_VOTER_SET(94, "Indicates that the either the sender or recipient of a " +
             "voter-only request is not one of the expected voters", InconsistentVoterSetException::new),
     INVALID_UPDATE_VERSION(95, "The given update version was invalid.", InvalidUpdateVersionException::new),
-    FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new);
+    FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new),
+    BROKER_AUTHORIZATION_FAILURE(97, "Authorization failed for the request during forwarding. " +

Review comment:
       `CLUSTER_AUTHORIZATION_FAILURE` normally indicates a client side security configuration error. We intentionally define a separate error code to let admin know that there is some security config trouble with the brokers, not the clients.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493742677



##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -103,6 +103,9 @@ object ApiVersion {
     KAFKA_2_7_IV0,
     // Bup Fetch protocol for Raft protocol (KIP-595)
     KAFKA_2_7_IV1,
+    // Enable redirection (KIP-590)
+    // TODO: remove this IBP in the 2.7 release if redirection work could not be done before the freeze

Review comment:
       I'm not sure I follow. Do you not want redirection to be part of 2.7?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510533897



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int,
                 val context = new RequestContext(header, connectionId, channel.socketAddress,
                   channel.principal, listenerName, securityProtocol,
                   channel.channelMetadataRegistry.clientInformation, isPrivilegedListener)
-                val req = new RequestChannel.Request(processor = id, context = context,
-                  startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
+
+                val principalSerde = Option(channel.principalSerde.orElse(null))

Review comment:
       You probably need the following:
   ```scala
   import scala.compat.java8.OptionConverters._
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r476864248



##########
File path: clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationFailureException.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Exception used to indicate a broker side authorization failure during request redirection.
+ */
+public class BrokerAuthorizationFailureException extends AuthorizationException {
+

Review comment:
       Interesting, why does the `AuthorizationException` have no `serialVersionUID`? Is it because we never use that error code explicitly?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r509670268



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+      sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined &&
+      (!request.context.fromPrivilegedListener ||
+      !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME))
+    ) {
+      // If the designated forwarding request is not coming from a privileged listener, or
+      // it fails CLUSTER_ACTION permission, we would fail the authorization.
+      sendErrorResponseMaybeThrottle(request, Errors.BROKER_AUTHORIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined && !controller.isActive) {
+      sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception())
+    } else if (!controller.isActive && couldDoRedirection(request)) {
+      redirectionManager.forwardRequest(sendResponseMaybeThrottle, request)
+    } else {
+      // When IBP is smaller than 2.8 or the principal serde is undefined, forwarding is not supported,
+      // therefore requests are handled directly.
+      handler(request)
+    }
+  }
+
+  private def couldDoRedirection(request: RequestChannel.Request): Boolean =

Review comment:
       Sounds good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r505765664



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -338,7 +339,9 @@
     INCONSISTENT_VOTER_SET(94, "Indicates that the either the sender or recipient of a " +
             "voter-only request is not one of the expected voters", InconsistentVoterSetException::new),
     INVALID_UPDATE_VERSION(95, "The given update version was invalid.", InvalidUpdateVersionException::new),
-    FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new);
+    FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new),
+    BROKER_AUTHORIZATION_FAILURE(97, "Authorization failed for the request during forwarding. " +

Review comment:
       What is the benefit of using a different error code instead of `CLUSTER_AUTHORIZATION_FAILURE`?

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -251,7 +253,8 @@ public Struct parseResponse(short version, ByteBuffer buffer) {
         DescribeQuorumRequestData.SCHEMAS, DescribeQuorumResponseData.SCHEMAS),
     ALTER_ISR(56, "AlterIsr", AlterIsrRequestData.SCHEMAS, AlterIsrResponseData.SCHEMAS),
     UPDATE_FEATURES(57, "UpdateFeatures",
-        UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS);
+        UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS),
+    ENVELOPE(58, "Envelope", EnvelopeRequestData.SCHEMAS, EnvelopeResponseData.SCHEMAS);

Review comment:
       I believe we need to set `requiresDelayedAllocation` for this API. Typically we will release the underlying buffer allocated for a request when `RequestChannel.Request` is constructed. However, since we are using "zeroCopy," we need to hold onto the `ByteBuffer` reference until the API has been handled.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -121,6 +121,33 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   */
+  private[server] abstract class ForwardRequestHandler(request: RequestChannel.Request, isForwardRequest: Boolean,

Review comment:
       It seems like we're trying to reuse this handler from the previous patch, but I'm not sure it still makes as much sense. A simpler structure might be something like the following:
   
   ```scala
     private def maybeForward(
       request: RequestChannel.Request,
       handler: RequestChannel.Request => Unit
     ): Unit = {
       if (!controller.isActive && config.redirectionEnabled && request.context.principalSerde.isPresent) {
         redirectionManager.forwardRequest(sendResponseMaybeThrottle, request)
       } else {
         // When IBP is smaller than 2.8 or the principal serde is undefined, forwarding is not supported,
         // therefore requests are handled directly.
         handler(request)
       }
     }
   
     // then invoked like this
   override def handle(request: RequestChannel.Request): Unit = {
       try {
         trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
           s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
   
         request.header.apiKey match {
         ...
           case ApiKeys.ALTER_CONFIGS => maybeForward(request, handleAlterConfigsRequest)
   ...
   
   
     // unchanged
     def handleAlterConfigs(request): Unit 
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org