You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/26 22:52:32 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

hachikuji opened a new pull request #10223:
URL: https://github.com/apache/kafka/pull/10223


   We now accept topicIds in the `DescribeTopic` request. If the client principal does not have `Describe` permission, then we return `UNKNOWN_TOPIC_ID` regardless whether the topic exists or not. However, if the topic does exist, then we also set its name in the response, which gives the user a way to infer existence. This is probably not a major issue since the client would still need to find the topicId first, but it is an easy hole to plug as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585017852



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1874,7 +1874,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
           throw new InvalidRequestException("Topic name and topic ID can not both be specified.")
         val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
-        else zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
+        else zkSupport.controller.controllerContext.topicName(topic.topicId).orNull

Review comment:
       I mainly did this for easier mocking. I would like to see reduced access for `topicNames`, but that feels like something to address separately.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan edited a comment on pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
jolshan edited a comment on pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#issuecomment-787106214


   It also seems like this part of KafkaApis is completely rewritten in https://github.com/apache/kafka/pull/10184. So maybe any changes to KafkaApis should be addressed in that PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#issuecomment-787081242


   Not sure whether it is possible to have a helper method to process this complicated code for both `KafkaApis` and `ControllApis`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10223: KAFKA-12394: Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585630167



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,26 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {

Review comment:
       (code style)
   
   Is it more readable if we replace `!authorizedDescribeTopics(topic.name)` by `!authorizedDescribeTopics.contains(topic.name)`? It seems other set collections in this method use `.contains`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#issuecomment-787106214


   It also seems like this code is completely rewritten in https://github.com/apache/kafka/pull/10184. So maybe any changes to KafkaApis should be addressed in that PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10223: KAFKA-12394: Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585212982



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name should
+          // not be returned in the response.
+          topic.setName(null)
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)

Review comment:
       @chia7712 @jolshan I ended up doing this here after all since it sounds like there is consensus on not treating the topicId as sensitive based on the JIRA discussion. Let me know if you have any concerns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585027051



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3479,6 +3481,161 @@ class KafkaApisTest {
     assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), fooState.topics.asScala.toList)
   }
 
+  @Test
+  def testDeleteTopicsByIdAuthorization(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val controllerContext: ControllerContext = EasyMock.mock(classOf[ControllerContext])
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.anyObject(classOf[RequestChannel.Request]),
+      EasyMock.anyShort()
+    )).andReturn(UnboundedControllerMutationQuota)
+    EasyMock.expect(controller.isActive).andReturn(true)
+    EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)
+
+    // Try to delete three topics:
+    // 1. One without describe permission
+    // 2. One without delete permission
+    // 3. One which is authorized, but doesn't exist
+
+    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED
+    ))
+
+    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.DENIED
+    ))
+
+    val topicIdsMap = Map(
+      Uuid.randomUuid() -> Some("foo"),
+      Uuid.randomUuid() -> Some("bar"),
+      Uuid.randomUuid() -> None
+    )
+
+    topicIdsMap.foreach { case (topicId, topicNameOpt) =>
+      EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
+    }
+
+    val topicDatas = topicIdsMap.keys.map { topicId =>
+      new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId)
+    }.toList
+    val deleteRequest = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+      .setTopics(topicDatas.asJava))
+      .build(ApiKeys.DELETE_TOPICS.latestVersion)
+
+    val request = buildRequest(deleteRequest)
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, txnCoordinator, controller, controllerContext, authorizer)
+    createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+
+    val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+    topicIdsMap.foreach { case (topicId, nameOpt) =>
+      val response = deleteResponse.data.responses.asScala.find(_.topicId == topicId).get
+      nameOpt match {
+        case Some("foo") =>
+          assertNull(response.name)
+          assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode))
+        case Some("bar") =>
+          assertEquals("bar", response.name)
+          assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode))
+        case None =>
+          assertNull(response.name)
+          assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode))
+        case _ =>
+          fail("Unexpected topic id/name mapping")
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.anyObject(classOf[RequestChannel.Request]),
+      EasyMock.anyShort()
+    )).andReturn(UnboundedControllerMutationQuota)
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    // Try to delete three topics:
+    // 1. One without describe permission
+    // 2. One without delete permission
+    // 3. One which is authorized, but doesn't exist
+
+    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED,
+      "baz" -> AuthorizationResult.ALLOWED
+    ))
+
+    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.DENIED,
+      "baz" -> AuthorizationResult.ALLOWED
+    ))
+
+    val deleteRequest = if (usePrimitiveTopicNameArray) {
+      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+        .setTopicNames(List("foo", "bar", "baz").asJava))
+        .build(5.toShort)
+    } else {
+      val topicDatas = List(
+        new DeleteTopicsRequestData.DeleteTopicState().setName("foo"),
+        new DeleteTopicsRequestData.DeleteTopicState().setName("bar"),
+        new DeleteTopicsRequestData.DeleteTopicState().setName("baz")
+      )
+      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+        .setTopics(topicDatas.asJava))
+        .build(ApiKeys.DELETE_TOPICS.latestVersion)
+    }
+
+    val request = buildRequest(deleteRequest)
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, txnCoordinator, controller, authorizer)
+    createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+
+    val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+    def lookupErrorCode(topic: String): Option[Errors] = {
+      Option(deleteResponse.data.responses().find(topic))
+        .map(result => Errors.forCode(result.errorCode))
+    }
+
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("foo"))
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("bar"))
+    assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz"))
+  }
+
+  def expectTopicAuthorization(

Review comment:
       I looked at doing this, but the need to preserve order makes it just as cumbersome. The capture also provides some type safety and avoids the need for casting from the argument list. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10223: KAFKA-12394: Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10223:
URL: https://github.com/apache/kafka/pull/10223


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r584123786



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1874,7 +1874,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
           throw new InvalidRequestException("Topic name and topic ID can not both be specified.")
         val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
-        else zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
+        else zkSupport.controller.controllerContext.topicName(topic.topicId).orNull

Review comment:
       Do you want to narrow the access of `topicNames`? If so, we should change the modifier of `topicNames` from public to private

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name should
+          // not be returned in the response.
+          topic.setName(null)
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)

Review comment:
       I prepared to discuss this error code on https://github.com/apache/kafka/pull/10184#discussion_r583933291 :)
   
   ### return `UNKNOWN_TOPIC_ID`
   
   Could you add clear error message to this response (call `setErrorMessage`)? I can imagine users get confusing for the error code `UNKNOWN_TOPIC_ID` because it presents two scenarios now (the id does not exist or you have no permission to describe topic). 
   
   ### return `TOPIC_AUTHORIZATION_FAILED`
   
   It indicates accurate error and it can help user handle un-authorized requests. Personally, this error is more suitable :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r584145063



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name should
+          // not be returned in the response.
+          topic.setName(null)

Review comment:
       That's a good question. I agree using `mapKey` makes less sense now. Let me try it out. If the diff is not too bad, we can do it here. Otherwise, we can do it separately.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585027051



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3479,6 +3481,161 @@ class KafkaApisTest {
     assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), fooState.topics.asScala.toList)
   }
 
+  @Test
+  def testDeleteTopicsByIdAuthorization(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val controllerContext: ControllerContext = EasyMock.mock(classOf[ControllerContext])
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.anyObject(classOf[RequestChannel.Request]),
+      EasyMock.anyShort()
+    )).andReturn(UnboundedControllerMutationQuota)
+    EasyMock.expect(controller.isActive).andReturn(true)
+    EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)
+
+    // Try to delete three topics:
+    // 1. One without describe permission
+    // 2. One without delete permission
+    // 3. One which is authorized, but doesn't exist
+
+    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED
+    ))
+
+    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.DENIED
+    ))
+
+    val topicIdsMap = Map(
+      Uuid.randomUuid() -> Some("foo"),
+      Uuid.randomUuid() -> Some("bar"),
+      Uuid.randomUuid() -> None
+    )
+
+    topicIdsMap.foreach { case (topicId, topicNameOpt) =>
+      EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
+    }
+
+    val topicDatas = topicIdsMap.keys.map { topicId =>
+      new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId)
+    }.toList
+    val deleteRequest = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+      .setTopics(topicDatas.asJava))
+      .build(ApiKeys.DELETE_TOPICS.latestVersion)
+
+    val request = buildRequest(deleteRequest)
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, txnCoordinator, controller, controllerContext, authorizer)
+    createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+
+    val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+    topicIdsMap.foreach { case (topicId, nameOpt) =>
+      val response = deleteResponse.data.responses.asScala.find(_.topicId == topicId).get
+      nameOpt match {
+        case Some("foo") =>
+          assertNull(response.name)
+          assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode))
+        case Some("bar") =>
+          assertEquals("bar", response.name)
+          assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode))
+        case None =>
+          assertNull(response.name)
+          assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode))
+        case _ =>
+          fail("Unexpected topic id/name mapping")
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.anyObject(classOf[RequestChannel.Request]),
+      EasyMock.anyShort()
+    )).andReturn(UnboundedControllerMutationQuota)
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    // Try to delete three topics:
+    // 1. One without describe permission
+    // 2. One without delete permission
+    // 3. One which is authorized, but doesn't exist
+
+    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED,
+      "baz" -> AuthorizationResult.ALLOWED
+    ))
+
+    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.DENIED,
+      "baz" -> AuthorizationResult.ALLOWED
+    ))
+
+    val deleteRequest = if (usePrimitiveTopicNameArray) {
+      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+        .setTopicNames(List("foo", "bar", "baz").asJava))
+        .build(5.toShort)
+    } else {
+      val topicDatas = List(
+        new DeleteTopicsRequestData.DeleteTopicState().setName("foo"),
+        new DeleteTopicsRequestData.DeleteTopicState().setName("bar"),
+        new DeleteTopicsRequestData.DeleteTopicState().setName("baz")
+      )
+      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+        .setTopics(topicDatas.asJava))
+        .build(ApiKeys.DELETE_TOPICS.latestVersion)
+    }
+
+    val request = buildRequest(deleteRequest)
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, txnCoordinator, controller, authorizer)
+    createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+
+    val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+    def lookupErrorCode(topic: String): Option[Errors] = {
+      Option(deleteResponse.data.responses().find(topic))
+        .map(result => Errors.forCode(result.errorCode))
+    }
+
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("foo"))
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("bar"))
+    assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz"))
+  }
+
+  def expectTopicAuthorization(

Review comment:
       I looked at doing this, but the need to preserve order in the result makes it just as cumbersome. The capture also provides some type safety and avoids the need for casting from the argument list. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji edited a comment on pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji edited a comment on pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#issuecomment-788239224


   @jolshan @chia7712 I don't think `KafkaApis` has been rewritten yet. My suggestion is probably get this checked in as is so that we can definitely get it into 2.8. Then we should consider how to consolidate the handling logic in `ControllerApis` and `KafkaApis` after #10184 is merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r584100885



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3479,6 +3481,161 @@ class KafkaApisTest {
     assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), fooState.topics.asScala.toList)
   }
 
+  @Test
+  def testDeleteTopicsByIdAuthorization(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val controllerContext: ControllerContext = EasyMock.mock(classOf[ControllerContext])
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.anyObject(classOf[RequestChannel.Request]),
+      EasyMock.anyShort()
+    )).andReturn(UnboundedControllerMutationQuota)
+    EasyMock.expect(controller.isActive).andReturn(true)
+    EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)
+
+    // Try to delete three topics:
+    // 1. One without describe permission
+    // 2. One without delete permission
+    // 3. One which is authorized, but doesn't exist
+
+    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED
+    ))
+
+    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.DENIED
+    ))
+
+    val topicIdsMap = Map(
+      Uuid.randomUuid() -> Some("foo"),
+      Uuid.randomUuid() -> Some("bar"),
+      Uuid.randomUuid() -> None
+    )
+
+    topicIdsMap.foreach { case (topicId, topicNameOpt) =>
+      EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
+    }
+
+    val topicDatas = topicIdsMap.keys.map { topicId =>
+      new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId)
+    }.toList
+    val deleteRequest = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+      .setTopics(topicDatas.asJava))
+      .build(ApiKeys.DELETE_TOPICS.latestVersion)
+
+    val request = buildRequest(deleteRequest)
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, txnCoordinator, controller, controllerContext, authorizer)
+    createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+
+    val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+    topicIdsMap.foreach { case (topicId, nameOpt) =>
+      val response = deleteResponse.data.responses.asScala.find(_.topicId == topicId).get
+      nameOpt match {
+        case Some("foo") =>
+          assertNull(response.name)
+          assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode))
+        case Some("bar") =>
+          assertEquals("bar", response.name)
+          assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode))
+        case None =>
+          assertNull(response.name)
+          assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode))
+        case _ =>
+          fail("Unexpected topic id/name mapping")
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.anyObject(classOf[RequestChannel.Request]),
+      EasyMock.anyShort()
+    )).andReturn(UnboundedControllerMutationQuota)
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    // Try to delete three topics:
+    // 1. One without describe permission
+    // 2. One without delete permission
+    // 3. One which is authorized, but doesn't exist
+
+    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED,
+      "baz" -> AuthorizationResult.ALLOWED
+    ))
+
+    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.DENIED,
+      "baz" -> AuthorizationResult.ALLOWED
+    ))
+
+    val deleteRequest = if (usePrimitiveTopicNameArray) {
+      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+        .setTopicNames(List("foo", "bar", "baz").asJava))
+        .build(5.toShort)
+    } else {
+      val topicDatas = List(
+        new DeleteTopicsRequestData.DeleteTopicState().setName("foo"),
+        new DeleteTopicsRequestData.DeleteTopicState().setName("bar"),
+        new DeleteTopicsRequestData.DeleteTopicState().setName("baz")
+      )
+      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+        .setTopics(topicDatas.asJava))
+        .build(ApiKeys.DELETE_TOPICS.latestVersion)
+    }
+
+    val request = buildRequest(deleteRequest)
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, txnCoordinator, controller, authorizer)
+    createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+
+    val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+    def lookupErrorCode(topic: String): Option[Errors] = {
+      Option(deleteResponse.data.responses().find(topic))
+        .map(result => Errors.forCode(result.errorCode))
+    }
+
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("foo"))
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("bar"))
+    assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz"))
+  }
+
+  def expectTopicAuthorization(

Review comment:
       Could we simplify this method using `AuthHelperTest.matchSameElements()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r584161936



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name should
+          // not be returned in the response.
+          topic.setName(null)
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)

Review comment:
       I think the issue with TOPIC_AUTHORIZATION_FAILED is that we are returning a different error message than the case where the topic ID does not exists and we are implying the existence of a topic when we should not be.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r584092454



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name should
+          // not be returned in the response.
+          topic.setName(null)

Review comment:
       I just noticed that the `Name` in the schema [1] used `mapKey: true`. I wonder if we should remove it now that `Name` is nullable. Usually, when we use `mapKey`, we expect to be able to query with the name and this is not always the case now. It seems that we don't rely on it in the admin client but we do in tests. What do you think?
   
   [1] https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DeleteTopicsResponse.json#L41




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#issuecomment-788251941


   @hachikuji Ah sorry. I was misreading the file name. Makes sense.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585014029



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name should
+          // not be returned in the response.
+          topic.setName(null)
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)

Review comment:
       I made the comment before fully realizing how topic IDs were complicating the matter. I filed this JIRA to discuss further: https://issues.apache.org/jira/browse/KAFKA-12394. I'd suggest that we keep the current behavior in this PR and fix the small issue. It would be good to have the tests in any case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10223: KAFKA-12394: Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585628385



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,26 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name should
+          // not be returned in the response. Note, however, that we do not consider
+          // the topicId itself to be sensitive, so there is no reason to obscure
+          // this case with `UNKNOWN_TOPIC_ID`.
+          topic.setName(null)
+          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+        } else if (!authorizedDeleteTopics.contains(topic.name)) {

Review comment:
       > This seems defensible to me. The UNKNOWN_TOPIC_OR_PARTITION error will cause the client to retry because of the possibility of stale metadata, but it can't delete the topic anyway because of the authorization failure. It seems better to fail fast?
   
   That makes sense to me. We have to make sure this rule is applied to #10184 :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585036724



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name should
+          // not be returned in the response.
+          topic.setName(null)

Review comment:
       We have one dependence here which makes this a little more work than I wanted to do here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1909. I filed a separate JIRA so that we don't forget about it: https://issues.apache.org/jira/browse/KAFKA-12395.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10223: KAFKA-12394: Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585322244



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,26 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name should
+          // not be returned in the response. Note, however, that we do not consider
+          // the topicId itself to be sensitive, so there is no reason to obscure
+          // this case with `UNKNOWN_TOPIC_ID`.
+          topic.setName(null)
+          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+        } else if (!authorizedDeleteTopics.contains(topic.name)) {

Review comment:
       according to https://github.com/apache/kafka/pull/10184#discussion_r585086425, should it handle the case `name provided, topic missing, describable => UNKNOWN_TOPIC_OR_PARTITION`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#issuecomment-788239224


   @jolshan @chia7712 I don't think `KafkaApis` has been rewritten yet. My suggestion is probably get this checked in as is so that we can definitely get it into 2.8. Then we should consider how to consolidate the handling logic in `ControllerApis` and `KafkaApis`. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10223: KAFKA-12394: Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r585602805



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1884,20 +1884,26 @@ class KafkaApis(val requestChannel: RequestChannel,
       val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
-        val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
-         if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-           topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-           topic.setErrorMessage("Topic IDs are not supported on the server.")
-         } else if (unresolvedTopicId)
-             topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
-         else if (!authorizedDeleteTopics.contains(topic.name))
-           topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-         else if (!metadataCache.contains(topic.name))
-           topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-         else
-           toDelete += topic.name
+        val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
+        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
+          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+          topic.setErrorMessage("Topic IDs are not supported on the server.")
+        } else if (unresolvedTopicId) {
+          topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+        } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) {
+          // Because the client does not have Describe permission, the name should
+          // not be returned in the response. Note, however, that we do not consider
+          // the topicId itself to be sensitive, so there is no reason to obscure
+          // this case with `UNKNOWN_TOPIC_ID`.
+          topic.setName(null)
+          topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+        } else if (!authorizedDeleteTopics.contains(topic.name)) {

Review comment:
       I guess there are really two sub-cases. Here's how they are currently handled:
   ```
   name provided, topic missing, describable, deletable => UNKNOWN_TOPIC_OR_PARTITION
   name provided, topic missing, describable, undeletable => TOPIC_AUTHORIZATION_FAILED
   ```
   This seems defensible to me. The `UNKNOWN_TOPIC_OR_PARTITION` error will cause the client to retry because of the possibility of stale metadata, but it can't delete the topic anyway because of the authorization failure. It seems better to fail fast?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org