You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/08 19:35:24 UTC

[GitHub] [kafka] cmccabe opened a new pull request #10505: MINOR: fix some bugs in ControllerApis.scala

cmccabe opened a new pull request #10505:
URL: https://github.com/apache/kafka/pull/10505


   Fix some cases where ControllerApis was blocking on the controller
   thread.  This should not be necessary, since the controller thread can
   just interface directly with the network threads.
   
   alterClientQuotas and incrementalAlterConfigs were not doing
   authorization correctly in ControllerApis.scala.  Since the previous
   release of KRaft did not support authorizers, this bug is not as severe
   as it could have been, but it still needs to be fixed.  This PR also
   adds unit tests to verify that all of the controller operations return
   authorization failures when appropriate.
   
   Additionally, this PR fixes a comment in ControllerApis#deleteTopics
   that no longer reflects what the code is doing when we don't have
   "describe" permission.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#issuecomment-819907811


   > @cmccabe thanks for this nice patch. It seems this patch includes some different targets. Could you update description to include them (for example, this patch adds new APIs to controller)?
   
   Added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612742924



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -517,25 +540,67 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
     val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
-    authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
-    val configChanges = new util.HashMap[ConfigResource, util.Map[String, util.Map.Entry[AlterConfigOp.OpType, String]]]()
+    val configChanges = new util.HashMap[ConfigResource,
+      util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
     alterConfigsRequest.data.resources.forEach { resource =>
-      val configResource = new ConfigResource(ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
-      val altersByName = new util.HashMap[String, util.Map.Entry[AlterConfigOp.OpType, String]]()
+      val configResource = new ConfigResource(
+        ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
+      val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
       resource.configs.forEach { config =>
         altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
           AlterConfigOp.OpType.forId(config.configOperation), config.value))
       }
       configChanges.put(configResource, altersByName)
     }
+    val results = new util.HashMap[ConfigResource, ApiError]
+    val iterator = configChanges.keySet().iterator()
+    while (iterator.hasNext) {
+      val resource = iterator.next()
+      val apiError = resource.`type` match {
+        case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
+          if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
+            new ApiError(NONE)
+          } else {
+            new ApiError(CLUSTER_AUTHORIZATION_FAILED)

Review comment:
       Yes, we could be authorized to change some things but not others.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10505:
URL: https://github.com/apache/kafka/pull/10505


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r613807365



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of describeable
-    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {

Review comment:
       > For some reason leaving out the braces doesn't work here.
   
   Pardon me. Did you mean it can't be compiled? The following code can compile on my local (and `ControllerApisTest` pass)
   ```scala
       controller.findTopicNames(providedIds).thenCompose { topicNames =>
         topicNames.forEach { (id, nameOrError) =>
           if (nameOrError.isError) {
             appendResponse(null, id, nameOrError.error())
           } else {
             toAuthenticate.add(nameOrError.result())
             idToName.put(id, nameOrError.result())
           }
         }
         // Get the list of deletable topics (those we can delete) and the list of describeable
         // topics.
         val topicsToAuthenticate = toAuthenticate.asScala
         val (describeable, deletable) = if (hasClusterAuth) {
           (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
         } else {
           (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
         }
         // For each topic that was provided by ID, check if authentication failed.
         // If so, remove it from the idToName map and create an error response for it.
         val iterator = idToName.entrySet().iterator()
         while (iterator.hasNext) {
           val entry = iterator.next()
           val id = entry.getKey
           val name = entry.getValue
           if (!deletable.contains(name)) {
             if (describeable.contains(name)) {
               appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
             } else {
               appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
             }
             iterator.remove()
           }
         }
         // For each topic that was provided by name, check if authentication failed.
         // If so, create an error response for it. Otherwise, add it to the idToName map.
         controller.findTopicIds(providedNames).thenCompose { topicIds =>
           topicIds.forEach { (name, idOrError) =>
             if (!describeable.contains(name)) {
               appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
             } else if (idOrError.isError) {
               appendResponse(name, ZERO_UUID, idOrError.error)
             } else if (deletable.contains(name)) {
               val id = idOrError.result()
               if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) {
                 // This is kind of a weird case: what if we supply topic ID X and also a name
                 // that maps to ID X?  In that case, _if authorization succeeds_, we end up
                 // here.  If authorization doesn't succeed, we refrain from commenting on the
                 // situation since it would reveal topic ID mappings.
                 duplicateProvidedIds.add(id)
                 idToName.remove(id)
                 appendResponse(name, id, new ApiError(INVALID_REQUEST,
                   "The provided topic name maps to an ID that was already supplied."))
               }
             } else {
               appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
             }
           }
           // Finally, the idToName map contains all the topics that we are authorized to delete.
           // Perform the deletion and create responses for each one.
           controller.deleteTopics(idToName.keySet).thenApply { idToError =>
             idToError.forEach { (id, error) =>
               appendResponse(idToName.get(id), id, error)
             }
             // Shuffle the responses so that users can not use patterns in their positions to
             // distinguish between absent topics and topics we are not permitted to see.
             Collections.shuffle(responses)
             responses
           }
         }
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r613455255



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of describeable
-    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {

Review comment:
       For some reason leaving out the braces doesn't work here.  Maybe it's because this block needs to be translated into a `java.util.Function` object.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r614266078



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of describeable
-    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {

Review comment:
       Sure. I will file a PR to deal with it if I get a way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r613551585



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1023,6 +1027,22 @@ private QuorumController(LogContext logContext,
         });
     }
 
+    @Override
+    public CompletableFuture<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
+        CompletableFuture<AlterPartitionReassignmentsResponseData> future = new CompletableFuture<>();
+        future.completeExceptionally(new UnsupportedOperationException());

Review comment:
       Hmm... well, it's going to be supported soon, so I'd rather keep it this way for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612738586



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -35,8 +36,8 @@ import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
-import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsRequestData, CreateTopicsResponseData, DeleteTopicsRequestData, DeleteTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData}
-import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, TOPIC_AUTHORIZATION_FAILED}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors._

Review comment:
       I think this makes it more readable.  Long lines make reading and merging difficult




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r614264455



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of describeable
-    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {

Review comment:
       When I remove the braces I get this compiler error:
   ```
   > Task :core:compileScala FAILED
   [Error] /home/cmccabe/src/kafka1/core/src/main/scala/kafka/server/ControllerApis.scala:270: ')' expected but 'val' found.
   [Error] /home/cmccabe/src/kafka1/core/src/main/scala/kafka/server/ControllerApis.scala:278: ';' expected but 'val' found.
   two errors found
   ```
   
   Maybe let's follow up on this in a separate PR, if you can find a way to remove some of the braces and still have it compile...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r613455861



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -505,9 +529,8 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
 
   def handleAlterClientQuotas(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, ALTER_CONFIGS)

Review comment:
       It doesn't really matter either way.  I will move it after the typecast to make it more consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612039100



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -343,21 +366,22 @@ class ControllerApis(val requestChannel: RequestChannel,
         iterator.remove()
       }
     }
-    val response = controller.createTopics(effectiveRequest).get()
-    duplicateTopicNames.forEach { name =>
-      response.topics().add(new CreatableTopicResult().
-        setName(name).
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Found multiple entries for this topic."))
-    }
-    topicNames.forEach { name =>
-      if (!authorizedTopicNames.contains(name)) {
+    controller.createTopics(effectiveRequest).thenApply(response => {
+      duplicateTopicNames.forEach { name =>
         response.topics().add(new CreatableTopicResult().
           setName(name).
-          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
+          setErrorCode(INVALID_REQUEST.code()).
+          setErrorMessage("Found multiple entries for this topic."))
       }
-    }
-    response
+      topicNames.forEach { name =>
+        if (!authorizedTopicNames.contains(name)) {
+          response.topics().add(new CreatableTopicResult().
+            setName(name).
+            setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))

Review comment:
       nit: here and other places, we don't need parens for `code()`

##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -151,6 +221,73 @@ class ControllerApisTest {
       brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testUnauthorizedHandleAlterClientQuotas(): Unit = {
+    assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
+      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
+        handleAlterClientQuotas(buildRequest(new AlterClientQuotasRequest(
+          new AlterClientQuotasRequestData(), 0))))
+  }
+
+  @Test
+  def testUnauthorizedHandleIncrementalAlterConfigs(): Unit = {
+    val requestData = new IncrementalAlterConfigsRequestData().setResources(
+      new AlterConfigsResourceCollection(
+        util.Arrays.asList(new IncrementalAlterConfigsRequestData.AlterConfigsResource().
+          setResourceName("1").
+          setResourceType(ConfigResource.Type.BROKER.id()).
+          setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
+            setName("log.cleaner.backoff.ms").

Review comment:
       Should we use the static KafkaConfig property definitions instead of these strings?

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -35,8 +36,8 @@ import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
-import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsRequestData, CreateTopicsResponseData, DeleteTopicsRequestData, DeleteTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData}
-import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, TOPIC_AUTHORIZATION_FAILED}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors._

Review comment:
       In KafkaApis we import `Errors` rather than importing all the members of the enum via a wildcard import. Any reason to prefer one way over the other? It seems more common in our code base to import the enum and refer to members like `Errors.ILLEGAL_SASL_STATE`

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -238,87 +249,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of describeable
-    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {
+      topicNames.forEach { (id, nameOrError) =>
+        if (nameOrError.isError) {
+          appendResponse(null, id, nameOrError.error())
         } else {
-          appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          toAuthenticate.add(nameOrError.result())
+          idToName.put(id, nameOrError.result())
         }
-        iterator.remove()
       }
-    }
-    // For each topic that was provided by name, check if authentication failed.
-    // If so, create an error response for it.  Otherwise, add it to the idToName map.
-    controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
-      if (!describeable.contains(name)) {
-        appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
-      } else if (idOrError.isError) {
-        appendResponse(name, ZERO_UUID, idOrError.error)
-      } else if (deletable.contains(name)) {
-        val id = idOrError.result()
-        if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) {
-          // This is kind of a weird case: what if we supply topic ID X and also a name
-          // that maps to ID X?  In that case, _if authorization succeeds_, we end up
-          // here.  If authorization doesn't succeed, we refrain from commenting on the
-          // situation since it would reveal topic ID mappings.
-          duplicateProvidedIds.add(id)
-          idToName.remove(id)
-          appendResponse(name, id, new ApiError(INVALID_REQUEST,
-            "The provided topic name maps to an ID that was already supplied."))
-        }
+      // Get the list of deletable topics (those we can delete) and the list of describeable
+      // topics.
+      val topicsToAuthenticate = toAuthenticate.asScala
+      val (describeable, deletable) = if (hasClusterAuth) {
+        (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
       } else {
-        appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+        (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
       }
-    }
-    // Finally, the idToName map contains all the topics that we are authorized to delete.
-    // Perform the deletion and create responses for each one.
-    val idToError = controller.deleteTopics(idToName.keySet).get()
-    idToError.forEach { (id, error) =>
-        appendResponse(idToName.get(id), id, error)
-    }
-    // Shuffle the responses so that users can not use patterns in their positions to
-    // distinguish between absent topics and topics we are not permitted to see.
-    Collections.shuffle(responses)
-    responses
+      // For each topic that was provided by ID, check if authentication failed.
+      // If so, remove it from the idToName map and create an error response for it.
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val id = entry.getKey
+        val name = entry.getValue
+        if (!deletable.contains(name)) {
+          if (describeable.contains(name)) {
+            appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else {
+            appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+          iterator.remove()
+        }
+      }
+      // For each topic that was provided by name, check if authentication failed.
+      // If so, create an error response for it.  Otherwise, add it to the idToName map.
+      controller.findTopicIds(providedNames).thenCompose(topicIds => {
+        topicIds.forEach { (name, idOrError) =>
+          if (!describeable.contains(name)) {
+            appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else if (idOrError.isError) {
+            appendResponse(name, ZERO_UUID, idOrError.error)
+          } else if (deletable.contains(name)) {
+            val id = idOrError.result()
+            if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) {
+              // This is kind of a weird case: what if we supply topic ID X and also a name
+              // that maps to ID X?  In that case, _if authorization succeeds_, we end up
+              // here.  If authorization doesn't succeed, we refrain from commenting on the
+              // situation since it would reveal topic ID mappings.
+              duplicateProvidedIds.add(id)
+              idToName.remove(id)
+              appendResponse(name, id, new ApiError(INVALID_REQUEST,
+                "The provided topic name maps to an ID that was already supplied."))
+            }
+          } else {
+            appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+        }
+        // Finally, the idToName map contains all the topics that we are authorized to delete.
+        // Perform the deletion and create responses for each one.
+        controller.deleteTopics(idToName.keySet).thenApply(idToError => {
+          idToError.forEach { (id, error) =>
+            appendResponse(idToName.get(id), id, error)
+          }
+          // Shuffle the responses so that users can not use patterns in their positions to
+          // distinguish between absent topics and topics we are not permitted to see.
+          Collections.shuffle(responses)
+          responses
+        })
+      })
+    })
   }
 
   def handleCreateTopics(request: RequestChannel.Request): Unit = {
-    val responseData = createTopics(request.body[CreateTopicsRequest].data(),
+    val createTopicsRequest = request.body[CreateTopicsRequest]
+    val future = createTopics(createTopicsRequest.data(),
         authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
         names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity))
-    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
-      responseData.setThrottleTimeMs(throttleTimeMs)
-      new CreateTopicsResponse(responseData)
+    future.whenComplete((result, exception) => {
+      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+        if (exception != null) {
+          createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
+        } else {
+          result.setThrottleTimeMs(throttleTimeMs)
+          new CreateTopicsResponse(result)
+        }
+      })
     })
   }
 
   def createTopics(request: CreateTopicsRequestData,
                    hasClusterAuth: Boolean,
-                   getCreatableTopics: Iterable[String] => Set[String]): CreateTopicsResponseData = {
+                   getCreatableTopics: Iterable[String] => Set[String])
+                   : CompletableFuture[CreateTopicsResponseData] = {

Review comment:
       Indentation seems weird. (I don't really know if it's correct or not)

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -517,25 +540,67 @@ class ControllerApis(val requestChannel: RequestChannel,
 
   def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
     val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
-    authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
-    val configChanges = new util.HashMap[ConfigResource, util.Map[String, util.Map.Entry[AlterConfigOp.OpType, String]]]()
+    val configChanges = new util.HashMap[ConfigResource,
+      util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
     alterConfigsRequest.data.resources.forEach { resource =>
-      val configResource = new ConfigResource(ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
-      val altersByName = new util.HashMap[String, util.Map.Entry[AlterConfigOp.OpType, String]]()
+      val configResource = new ConfigResource(
+        ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
+      val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
       resource.configs.forEach { config =>
         altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
           AlterConfigOp.OpType.forId(config.configOperation), config.value))
       }
       configChanges.put(configResource, altersByName)
     }
+    val results = new util.HashMap[ConfigResource, ApiError]
+    val iterator = configChanges.keySet().iterator()
+    while (iterator.hasNext) {
+      val resource = iterator.next()
+      val apiError = resource.`type` match {
+        case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
+          if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
+            new ApiError(NONE)
+          } else {
+            new ApiError(CLUSTER_AUTHORIZATION_FAILED)

Review comment:
       If we get an authorization error, we continue processing the remaining config items? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612742563



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -238,87 +249,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of describeable
-    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {
+      topicNames.forEach { (id, nameOrError) =>
+        if (nameOrError.isError) {
+          appendResponse(null, id, nameOrError.error())
         } else {
-          appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          toAuthenticate.add(nameOrError.result())
+          idToName.put(id, nameOrError.result())
         }
-        iterator.remove()
       }
-    }
-    // For each topic that was provided by name, check if authentication failed.
-    // If so, create an error response for it.  Otherwise, add it to the idToName map.
-    controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
-      if (!describeable.contains(name)) {
-        appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
-      } else if (idOrError.isError) {
-        appendResponse(name, ZERO_UUID, idOrError.error)
-      } else if (deletable.contains(name)) {
-        val id = idOrError.result()
-        if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) {
-          // This is kind of a weird case: what if we supply topic ID X and also a name
-          // that maps to ID X?  In that case, _if authorization succeeds_, we end up
-          // here.  If authorization doesn't succeed, we refrain from commenting on the
-          // situation since it would reveal topic ID mappings.
-          duplicateProvidedIds.add(id)
-          idToName.remove(id)
-          appendResponse(name, id, new ApiError(INVALID_REQUEST,
-            "The provided topic name maps to an ID that was already supplied."))
-        }
+      // Get the list of deletable topics (those we can delete) and the list of describeable
+      // topics.
+      val topicsToAuthenticate = toAuthenticate.asScala
+      val (describeable, deletable) = if (hasClusterAuth) {
+        (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
       } else {
-        appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+        (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
       }
-    }
-    // Finally, the idToName map contains all the topics that we are authorized to delete.
-    // Perform the deletion and create responses for each one.
-    val idToError = controller.deleteTopics(idToName.keySet).get()
-    idToError.forEach { (id, error) =>
-        appendResponse(idToName.get(id), id, error)
-    }
-    // Shuffle the responses so that users can not use patterns in their positions to
-    // distinguish between absent topics and topics we are not permitted to see.
-    Collections.shuffle(responses)
-    responses
+      // For each topic that was provided by ID, check if authentication failed.
+      // If so, remove it from the idToName map and create an error response for it.
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val id = entry.getKey
+        val name = entry.getValue
+        if (!deletable.contains(name)) {
+          if (describeable.contains(name)) {
+            appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else {
+            appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+          iterator.remove()
+        }
+      }
+      // For each topic that was provided by name, check if authentication failed.
+      // If so, create an error response for it.  Otherwise, add it to the idToName map.
+      controller.findTopicIds(providedNames).thenCompose(topicIds => {
+        topicIds.forEach { (name, idOrError) =>
+          if (!describeable.contains(name)) {
+            appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else if (idOrError.isError) {
+            appendResponse(name, ZERO_UUID, idOrError.error)
+          } else if (deletable.contains(name)) {
+            val id = idOrError.result()
+            if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) {
+              // This is kind of a weird case: what if we supply topic ID X and also a name
+              // that maps to ID X?  In that case, _if authorization succeeds_, we end up
+              // here.  If authorization doesn't succeed, we refrain from commenting on the
+              // situation since it would reveal topic ID mappings.
+              duplicateProvidedIds.add(id)
+              idToName.remove(id)
+              appendResponse(name, id, new ApiError(INVALID_REQUEST,
+                "The provided topic name maps to an ID that was already supplied."))
+            }
+          } else {
+            appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+        }
+        // Finally, the idToName map contains all the topics that we are authorized to delete.
+        // Perform the deletion and create responses for each one.
+        controller.deleteTopics(idToName.keySet).thenApply(idToError => {
+          idToError.forEach { (id, error) =>
+            appendResponse(idToName.get(id), id, error)
+          }
+          // Shuffle the responses so that users can not use patterns in their positions to
+          // distinguish between absent topics and topics we are not permitted to see.
+          Collections.shuffle(responses)
+          responses
+        })
+      })
+    })
   }
 
   def handleCreateTopics(request: RequestChannel.Request): Unit = {
-    val responseData = createTopics(request.body[CreateTopicsRequest].data(),
+    val createTopicsRequest = request.body[CreateTopicsRequest]
+    val future = createTopics(createTopicsRequest.data(),
         authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
         names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(identity))
-    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
-      responseData.setThrottleTimeMs(throttleTimeMs)
-      new CreateTopicsResponse(responseData)
+    future.whenComplete((result, exception) => {
+      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+        if (exception != null) {
+          createTopicsRequest.getErrorResponse(throttleTimeMs, exception)
+        } else {
+          result.setThrottleTimeMs(throttleTimeMs)
+          new CreateTopicsResponse(result)
+        }
+      })
     })
   }
 
   def createTopics(request: CreateTopicsRequestData,
                    hasClusterAuth: Boolean,
-                   getCreatableTopics: Iterable[String] => Set[String]): CreateTopicsResponseData = {
+                   getCreatableTopics: Iterable[String] => Set[String])
+                   : CompletableFuture[CreateTopicsResponseData] = {

Review comment:
       It's kind of a long return type, so I'd prefer to have it on a separate line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612981625



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of describeable
-    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {

Review comment:
       just code style. Could we use following style
   ```scala
   controller.findTopicNames(providedIds).thenCompose { topicNames =>
   
   }
   ```
   
   instead of
   
   ```scala
   controller.findTopicNames(providedIds).thenCompose(topicNames => {
   
   })
   ```

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -505,9 +529,8 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
 
   def handleAlterClientQuotas(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, ALTER_CONFIGS)

Review comment:
       just curious. Why this authorization check is executed "before" (casting) getting request? It seems most methods get request first. 

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1023,6 +1027,22 @@ private QuorumController(LogContext logContext,
         });
     }
 
+    @Override
+    public CompletableFuture<AlterPartitionReassignmentsResponseData>
+            alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
+        CompletableFuture<AlterPartitionReassignmentsResponseData> future = new CompletableFuture<>();
+        future.completeExceptionally(new UnsupportedOperationException());

Review comment:
       As this method is not supported, maybe we can move this implementation to interface? The benefit is that we don't need to add similar code to both `MockController` and `QuorumController`

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -347,21 +370,22 @@ class ControllerApis(val requestChannel: RequestChannel,
         iterator.remove()
       }
     }
-    val response = controller.createTopics(effectiveRequest).get()
-    duplicateTopicNames.forEach { name =>
-      response.topics().add(new CreatableTopicResult().
-        setName(name).
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Found multiple entries for this topic."))
-    }
-    topicNames.forEach { name =>
-      if (!authorizedTopicNames.contains(name)) {
+    controller.createTopics(effectiveRequest).thenApply(response => {
+      duplicateTopicNames.forEach { name =>
         response.topics().add(new CreatableTopicResult().
           setName(name).
-          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
+          setErrorCode(INVALID_REQUEST.code).
+          setErrorMessage("Found multiple entries for this topic."))

Review comment:
       How about unifying this error message? "Duplicate topic name."

##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -155,6 +225,73 @@ class ControllerApisTest {
       brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testUnauthorizedHandleAlterClientQuotas(): Unit = {
+    assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
+      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
+        handleAlterClientQuotas(buildRequest(new AlterClientQuotasRequest(
+          new AlterClientQuotasRequestData(), 0))))
+  }
+
+  @Test
+  def testUnauthorizedHandleIncrementalAlterConfigs(): Unit = {
+    val requestData = new IncrementalAlterConfigsRequestData().setResources(
+      new AlterConfigsResourceCollection(
+        util.Arrays.asList(new IncrementalAlterConfigsRequestData.AlterConfigsResource().
+          setResourceName("1").
+          setResourceType(ConfigResource.Type.BROKER.id()).
+          setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
+            setName(KafkaConfig.LogCleanerBackoffMsProp).
+            setValue("100000").
+            setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
+        new IncrementalAlterConfigsRequestData.AlterConfigsResource().
+          setResourceName("foo").
+          setResourceType(ConfigResource.Type.TOPIC.id()).
+          setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
+            setName(TopicConfig.FLUSH_MS_CONFIG).
+            setValue("1000").
+            setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator())),
+        ).iterator()))
+    val request = buildRequest(new IncrementalAlterConfigsRequest.Builder(requestData).build(0))
+    createControllerApis(Some(createDenyAllAuthorizer()),
+      new MockController.Builder().build()).handleIncrementalAlterConfigs(request)

Review comment:
       Could you add unit test for `BROKER_LOGGER` and `UNKNOWN`?

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -242,87 +253,99 @@ class ControllerApis(val requestChannel: RequestChannel,
     val toAuthenticate = new util.HashSet[String]
     toAuthenticate.addAll(providedNames)
     val idToName = new util.HashMap[Uuid, String]
-    controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-      if (nameOrError.isError) {
-        appendResponse(null, id, nameOrError.error())
-      } else {
-        toAuthenticate.add(nameOrError.result())
-        idToName.put(id, nameOrError.result())
-      }
-    }
-    // Get the list of deletable topics (those we can delete) and the list of describeable
-    // topics.  If a topic can't be deleted or described, we have to act like it doesn't
-    // exist, even when it does.
-    val topicsToAuthenticate = toAuthenticate.asScala
-    val (describeable, deletable) = if (hasClusterAuth) {
-      (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
-    } else {
-      (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
-    }
-    // For each topic that was provided by ID, check if authentication failed.
-    // If so, remove it from the idToName map and create an error response for it.
-    val iterator = idToName.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val id = entry.getKey
-      val name = entry.getValue
-      if (!deletable.contains(name)) {
-        if (describeable.contains(name)) {
-          appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+    controller.findTopicNames(providedIds).thenCompose(topicNames => {
+      topicNames.forEach { (id, nameOrError) =>
+        if (nameOrError.isError) {
+          appendResponse(null, id, nameOrError.error())
         } else {
-          appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          toAuthenticate.add(nameOrError.result())
+          idToName.put(id, nameOrError.result())
         }
-        iterator.remove()
       }
-    }
-    // For each topic that was provided by name, check if authentication failed.
-    // If so, create an error response for it.  Otherwise, add it to the idToName map.
-    controller.findTopicIds(providedNames).get().forEach { (name, idOrError) =>
-      if (!describeable.contains(name)) {
-        appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
-      } else if (idOrError.isError) {
-        appendResponse(name, ZERO_UUID, idOrError.error)
-      } else if (deletable.contains(name)) {
-        val id = idOrError.result()
-        if (duplicateProvidedIds.contains(id) || idToName.put(id, name) != null) {
-          // This is kind of a weird case: what if we supply topic ID X and also a name
-          // that maps to ID X?  In that case, _if authorization succeeds_, we end up
-          // here.  If authorization doesn't succeed, we refrain from commenting on the
-          // situation since it would reveal topic ID mappings.
-          duplicateProvidedIds.add(id)
-          idToName.remove(id)
-          appendResponse(name, id, new ApiError(INVALID_REQUEST,
-            "The provided topic name maps to an ID that was already supplied."))
-        }
+      // Get the list of deletable topics (those we can delete) and the list of describeable
+      // topics.
+      val topicsToAuthenticate = toAuthenticate.asScala
+      val (describeable, deletable) = if (hasClusterAuth) {
+        (topicsToAuthenticate.toSet, topicsToAuthenticate.toSet)
       } else {
-        appendResponse(name, ZERO_UUID, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+        (getDescribableTopics(topicsToAuthenticate), getDeletableTopics(topicsToAuthenticate))
       }
-    }
-    // Finally, the idToName map contains all the topics that we are authorized to delete.
-    // Perform the deletion and create responses for each one.
-    val idToError = controller.deleteTopics(idToName.keySet).get()
-    idToError.forEach { (id, error) =>
-        appendResponse(idToName.get(id), id, error)
-    }
-    // Shuffle the responses so that users can not use patterns in their positions to
-    // distinguish between absent topics and topics we are not permitted to see.
-    Collections.shuffle(responses)
-    responses
+      // For each topic that was provided by ID, check if authentication failed.
+      // If so, remove it from the idToName map and create an error response for it.
+      val iterator = idToName.entrySet().iterator()
+      while (iterator.hasNext) {
+        val entry = iterator.next()
+        val id = entry.getKey
+        val name = entry.getValue
+        if (!deletable.contains(name)) {
+          if (describeable.contains(name)) {
+            appendResponse(name, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          } else {
+            appendResponse(null, id, new ApiError(TOPIC_AUTHORIZATION_FAILED))
+          }
+          iterator.remove()
+        }
+      }
+      // For each topic that was provided by name, check if authentication failed.
+      // If so, create an error response for it.  Otherwise, add it to the idToName map.

Review comment:
       redundant space




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r614218074



##########
File path: core/src/test/java/kafka/test/MockController.java
##########
@@ -197,15 +209,73 @@ private MockController(Collection<MockTopic> initialTopics) {
 
     @Override
     public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
-            Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges,
+            Map<ConfigResource, Map<String, Entry<AlterConfigOp.OpType, String>>> configChanges,
             boolean validateOnly) {
+        Map<ConfigResource, ApiError> results = new HashMap<>();
+        for (Entry<ConfigResource, Map<String, Entry<AlterConfigOp.OpType, String>>> entry :
+                configChanges.entrySet()) {
+            ConfigResource resource = entry.getKey();
+            results.put(resource, incrementalAlterResource(resource, entry.getValue(), validateOnly));
+        }
+        CompletableFuture<Map<ConfigResource, ApiError>> future = new CompletableFuture<>();
+        future.complete(results);
+        return future;
+    }
+
+    private ApiError incrementalAlterResource(ConfigResource resource,
+            Map<String, Entry<AlterConfigOp.OpType, String>> ops, boolean validateOnly) {
+        for (Entry<String, Entry<AlterConfigOp.OpType, String>> entry : ops.entrySet()) {
+            AlterConfigOp.OpType opType = entry.getValue().getKey();
+            if (opType != SET && opType != DELETE) {
+                return new ApiError(INVALID_REQUEST, "This mock does not " +
+                    "support the " + opType + " config operation.");
+            }
+        }
+        if (!validateOnly) {
+            for (Entry<String, Entry<AlterConfigOp.OpType, String>> entry : ops.entrySet()) {
+                String key = entry.getKey();
+                AlterConfigOp.OpType op = entry.getValue().getKey();
+                String value = entry.getValue().getValue();
+                switch (op) {
+                    case SET:
+                        configs.computeIfAbsent(resource, __ -> new HashMap<>()).put(key, value);

Review comment:
       well, i thought a method reference would work here for the hash map, but I tried it and it doesn't seem to work. 🤔 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r613551280



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -347,21 +370,22 @@ class ControllerApis(val requestChannel: RequestChannel,
         iterator.remove()
       }
     }
-    val response = controller.createTopics(effectiveRequest).get()
-    duplicateTopicNames.forEach { name =>
-      response.topics().add(new CreatableTopicResult().
-        setName(name).
-        setErrorCode(INVALID_REQUEST.code()).
-        setErrorMessage("Found multiple entries for this topic."))
-    }
-    topicNames.forEach { name =>
-      if (!authorizedTopicNames.contains(name)) {
+    controller.createTopics(effectiveRequest).thenApply(response => {
+      duplicateTopicNames.forEach { name =>
         response.topics().add(new CreatableTopicResult().
           setName(name).
-          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
+          setErrorCode(INVALID_REQUEST.code).
+          setErrorMessage("Found multiple entries for this topic."))

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10505: MINOR: fix some bugs in ControllerApis.scala

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#issuecomment-820659355


   @chia7712: I re-ran the test failures locally and couldn't get them to occur.
   
   The failures I saw were mostly streams tests... those have have been a bit flaky lately... I hope we can fix this soon :(
   
   Also, closer to home, `RaftClusterTest#testCreateClusterAndCreateAndManyTopicsWithManyPartitions` seems to flake periodically on Jenkins (even before this PR), but I can't seem to reproduce it locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org