You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/17 17:44:59 UTC

[GitHub] [kafka] rondagostino opened a new pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis

rondagostino opened a new pull request #9300:
URL: https://github.com/apache/kafka/pull/9300


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9300:
URL: https://github.com/apache/kafka/pull/9300#discussion_r490604721



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1732,45 +1732,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None)
     }
 
+    // be sure to check authorization first, before checking if this is the controller, to avoid leaking
+    // information about the system (i.e. who is the controller) to principals unauthorized for that information
+
     val createTopicsRequest = request.body[CreateTopicsRequest]
     val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
-      }
-      val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
-      val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
+    createTopicsRequest.data.topics.forEach { topic =>
+      results.add(new CreatableTopicResult().setName(topic.name))
+    }
+    val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
+      logIfDenied = false)
+    val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+    val authorizedTopics =
+      if (hasClusterAuthorization) topics.toSet
+      else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+    val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,

Review comment:
       Might not matter since `CreateTopics` requests are infrequent, but the two passes for authorization are a bit vexing. Feels like we are missing a good intermediate type between this handler and `AdminManager`. Maybe we can replace the 3 maps that we pass to `AdminManager.createTopic` with a single map which contains all the state we need for each topic.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1732,45 +1732,57 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None)
     }
 
+    // be sure to check authorization first, before checking if this is the controller, to avoid leaking
+    // information about the system (i.e. who is the controller) to principals unauthorized for that information
+
     val createTopicsRequest = request.body[CreateTopicsRequest]
     val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
-      }
-      val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
-      val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
+    createTopicsRequest.data.topics.forEach { topic =>
+      results.add(new CreatableTopicResult().setName(topic.name))
+    }
+    val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
+      logIfDenied = false)
+    val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+    val authorizedTopics =
+      if (hasClusterAuthorization) topics.toSet
+      else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+    val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
+      topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
 
-      results.forEach { topic =>
-        if (results.findAll(topic.name).size > 1) {
-          topic.setErrorCode(Errors.INVALID_REQUEST.code)
-          topic.setErrorMessage("Found multiple entries for this topic.")
-        } else if (!authorizedTopics.contains(topic.name)) {
-          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-          topic.setErrorMessage("Authorization failed.")
-        }
-        if (!authorizedForDescribeConfigs.contains(topic.name)) {
-          topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-        }
+    results.forEach { topic =>
+      if (results.findAll(topic.name).size > 1) {
+        topic.setErrorCode(Errors.INVALID_REQUEST.code)
+        topic.setErrorMessage("Found multiple entries for this topic.")
+      } else if (!authorizedTopics.contains(topic.name)) {
+        topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+        topic.setErrorMessage("Authorization failed.")
       }
-      val toCreate = mutable.Map[String, CreatableTopic]()
-      createTopicsRequest.data.topics.forEach { topic =>
-        if (results.find(topic.name).errorCode == Errors.NONE.code) {
-          toCreate += topic.name -> topic
-        }
+      if (!authorizedForDescribeConfigs.contains(topic.name)) {
+        topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
       }
+    }
+    val toCreate = mutable.Map[String, CreatableTopic]()
+    createTopicsRequest.data.topics.forEach { topic =>
+      if (results.find(topic.name).errorCode == Errors.NONE.code) {
+        toCreate += topic.name -> topic
+      }
+    }
+    if (!controller.isActive) {
+      // don't provide the information that this node is not the controller unless they were authorized
+      // to perform at least one of their requests
+      sendResponseCallback(
+        if (toCreate.isEmpty) {

Review comment:
       Hmm.. If a topic is unauthorized, I think it's more important to return the authorization failure. There's not any reason I can think of why `NOT_CONTROLLER` should take precedence. It would just cause the request to be unnecessarily retried against the new coordinator. Similarly for the other APIs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9300:
URL: https://github.com/apache/kafka/pull/9300#discussion_r533763099



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1770,45 +1770,47 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None)
     }
 
+    // be sure to check authorization first, before checking if this is the controller, to avoid leaking
+    // information about the system (i.e. who is the controller) to principals unauthorized for that information
+
     val createTopicsRequest = request.body[CreateTopicsRequest]
     val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
+    createTopicsRequest.data.topics.forEach { topic =>
+      results.add(new CreatableTopicResult().setName(topic.name))
+    }
+    val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
+      logIfDenied = false)
+    val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+    val authorizedTopics =
+      if (hasClusterAuthorization) topics.toSet
+      else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+    val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
+      topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
+
+    results.forEach { topic =>
+      if (results.findAll(topic.name).size > 1) {
+        topic.setErrorCode(Errors.INVALID_REQUEST.code)
+        topic.setErrorMessage("Found multiple entries for this topic.")
+      } else if (!authorizedTopics.contains(topic.name)) {
+        topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+        topic.setErrorMessage("Authorization failed.")
+      }
+      if (!authorizedForDescribeConfigs.contains(topic.name)) {
+        topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
       }
-      val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
-      val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
-
+    }
+    if (!controller.isActive) {
+      // Don't provide the information that this node is not the controller unless they were authorized
+      // to perform at least one of their requests.  So only set NOT_CONTROLLER error for anything that so far has a
+      // success/NONE error code.  Keep the existing error codes that we've determined rather than overwriting them
+      // with NOT_CONTROLLER because that is potentially useful information for the client.
       results.forEach { topic =>
-        if (results.findAll(topic.name).size > 1) {
-          topic.setErrorCode(Errors.INVALID_REQUEST.code)
-          topic.setErrorMessage("Found multiple entries for this topic.")
-        } else if (!authorizedTopics.contains(topic.name)) {
-          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-          topic.setErrorMessage("Authorization failed.")
-        }
-        if (!authorizedForDescribeConfigs.contains(topic.name)) {
-          topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-        }
-      }
-      val toCreate = mutable.Map[String, CreatableTopic]()
-      createTopicsRequest.data.topics.forEach { topic =>
-        if (results.find(topic.name).errorCode == Errors.NONE.code) {
-          toCreate += topic.name -> topic
+        if(topic.errorCode() == Errors.NONE.code()) {

Review comment:
       nit: convention is to add space after `if`. There are a few of these in the patch

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1770,45 +1770,47 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None)
     }
 
+    // be sure to check authorization first, before checking if this is the controller, to avoid leaking
+    // information about the system (i.e. who is the controller) to principals unauthorized for that information
+
     val createTopicsRequest = request.body[CreateTopicsRequest]
     val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-    if (!controller.isActive) {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name)
-          .setErrorCode(Errors.NOT_CONTROLLER.code))
-      }
-      sendResponseCallback(results)
-    } else {
-      createTopicsRequest.data.topics.forEach { topic =>
-        results.add(new CreatableTopicResult().setName(topic.name))
+    createTopicsRequest.data.topics.forEach { topic =>
+      results.add(new CreatableTopicResult().setName(topic.name))
+    }
+    val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
+      logIfDenied = false)
+    val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+    val authorizedTopics =
+      if (hasClusterAuthorization) topics.toSet
+      else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+    val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
+      topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
+
+    results.forEach { topic =>
+      if (results.findAll(topic.name).size > 1) {
+        topic.setErrorCode(Errors.INVALID_REQUEST.code)
+        topic.setErrorMessage("Found multiple entries for this topic.")
+      } else if (!authorizedTopics.contains(topic.name)) {
+        topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+        topic.setErrorMessage("Authorization failed.")
+      }
+      if (!authorizedForDescribeConfigs.contains(topic.name)) {
+        topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
       }
-      val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
-        logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
-      val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap
-
+    }
+    if (!controller.isActive) {

Review comment:
       I think we could probably roll this into the loop above. What I'm thinking is something like this:
   ```scala
       val toCreate = mutable.Map[String, CreatableTopic]()
   
       results.forEach { topic =>
         if (results.findAll(topic.name).size > 1) {
           topic.setErrorCode(Errors.INVALID_REQUEST.code)
           topic.setErrorMessage("Found multiple entries for this topic.")
         } else if (!authorizedTopics.contains(topic.name)) {
           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
           topic.setErrorMessage("Authorization failed.")
         } else if (!controller.isActive) {
           topic.setErrorCode(Errors.NOT_CONTROLLER.code)
         } else {
           toCreate += topic.name -> topic
         }
         if (!authorizedForDescribeConfigs.contains(topic.name)) {
           topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
         }
      }
   
      if (toCreate.isEmpty) {
        sendResponseCallback(results)
      } else {
        ...
   ```
   
       

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3078,15 +3100,22 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleAlterUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
     val alterUserScramCredentialsRequest = request.body[AlterUserScramCredentialsRequest]
 
-    if (!controller.isActive) {
-      sendResponseMaybeThrottle(request, requestThrottleMs =>
-        alterUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.NOT_CONTROLLER.exception))
-    } else if (authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
-      val result = adminManager.alterUserScramCredentials(
-        alterUserScramCredentialsRequest.data.upsertions().asScala, alterUserScramCredentialsRequest.data.deletions().asScala)
-      sendResponseMaybeThrottle(request, requestThrottleMs =>
-        new AlterUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
+    // be sure to check authorization first, before checking if this is the controller, to avoid leaking

Review comment:
       More of a nit, but the comments seem to overstate the importance of not leaking the controller. I think this is really more about establishing a consistent practice of always checking authorization first before doing anything else.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1936,25 +1945,38 @@ class KafkaApis(val requestChannel: RequestChannel,
          else
            toDelete += topic.name
       }
-      // If no authorized topics return immediately
-      if (toDelete.isEmpty)
-        sendResponseCallback(results)
-      else {
-        def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = {
-          errors.foreach {
-            case (topicName, error) =>
-              results.find(topicName)
-                .setErrorCode(error.code)
+      if (!controller.isActive) {

Review comment:
       Same as above. I think it's a little more natural to roll the controller check into the loop above. If you do that, then the structure here will match what I suggested for create topic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org