You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/27 10:14:47 UTC

[GitHub] [kafka] dengziming commented on a change in pull request #10223: MINOR: Do not expose topic name in `DeleteTopic` response if no describe permission

dengziming commented on a change in pull request #10223:
URL: https://github.com/apache/kafka/pull/10223#discussion_r584100885



##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -3479,6 +3481,161 @@ class KafkaApisTest {
     assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), fooState.topics.asScala.toList)
   }
 
+  @Test
+  def testDeleteTopicsByIdAuthorization(): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val controllerContext: ControllerContext = EasyMock.mock(classOf[ControllerContext])
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.anyObject(classOf[RequestChannel.Request]),
+      EasyMock.anyShort()
+    )).andReturn(UnboundedControllerMutationQuota)
+    EasyMock.expect(controller.isActive).andReturn(true)
+    EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)
+
+    // Try to delete three topics:
+    // 1. One without describe permission
+    // 2. One without delete permission
+    // 3. One which is authorized, but doesn't exist
+
+    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED
+    ))
+
+    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.DENIED
+    ))
+
+    val topicIdsMap = Map(
+      Uuid.randomUuid() -> Some("foo"),
+      Uuid.randomUuid() -> Some("bar"),
+      Uuid.randomUuid() -> None
+    )
+
+    topicIdsMap.foreach { case (topicId, topicNameOpt) =>
+      EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
+    }
+
+    val topicDatas = topicIdsMap.keys.map { topicId =>
+      new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId)
+    }.toList
+    val deleteRequest = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+      .setTopics(topicDatas.asJava))
+      .build(ApiKeys.DELETE_TOPICS.latestVersion)
+
+    val request = buildRequest(deleteRequest)
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, txnCoordinator, controller, controllerContext, authorizer)
+    createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+
+    val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+    topicIdsMap.foreach { case (topicId, nameOpt) =>
+      val response = deleteResponse.data.responses.asScala.find(_.topicId == topicId).get
+      nameOpt match {
+        case Some("foo") =>
+          assertNull(response.name)
+          assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode))
+        case Some("bar") =>
+          assertEquals("bar", response.name)
+          assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode))
+        case None =>
+          assertNull(response.name)
+          assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode))
+        case _ =>
+          fail("Unexpected topic id/name mapping")
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = {
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.anyObject(classOf[RequestChannel.Request]),
+      EasyMock.anyShort()
+    )).andReturn(UnboundedControllerMutationQuota)
+    EasyMock.expect(controller.isActive).andReturn(true)
+
+    // Try to delete three topics:
+    // 1. One without describe permission
+    // 2. One without delete permission
+    // 3. One which is authorized, but doesn't exist
+
+    expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED,
+      "baz" -> AuthorizationResult.ALLOWED
+    ))
+
+    expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.DENIED,
+      "baz" -> AuthorizationResult.ALLOWED
+    ))
+
+    val deleteRequest = if (usePrimitiveTopicNameArray) {
+      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+        .setTopicNames(List("foo", "bar", "baz").asJava))
+        .build(5.toShort)
+    } else {
+      val topicDatas = List(
+        new DeleteTopicsRequestData.DeleteTopicState().setName("foo"),
+        new DeleteTopicsRequestData.DeleteTopicState().setName("bar"),
+        new DeleteTopicsRequestData.DeleteTopicState().setName("baz")
+      )
+      new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+        .setTopics(topicDatas.asJava))
+        .build(ApiKeys.DELETE_TOPICS.latestVersion)
+    }
+
+    val request = buildRequest(deleteRequest)
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager,
+      requestChannel, txnCoordinator, controller, authorizer)
+    createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request)
+
+    val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse]
+
+    def lookupErrorCode(topic: String): Option[Errors] = {
+      Option(deleteResponse.data.responses().find(topic))
+        .map(result => Errors.forCode(result.errorCode))
+    }
+
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("foo"))
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("bar"))
+    assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz"))
+  }
+
+  def expectTopicAuthorization(

Review comment:
       Could we simplify this method using `AuthHelperTest.matchSameElements()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org