You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/02/11 23:24:37 UTC

[kafka] branch 3.1 updated: KAFKA-13661; Consistent permissions in KRaft for CreatePartitions API (#11745)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 63ca77e  KAFKA-13661; Consistent permissions in KRaft for CreatePartitions API (#11745)
63ca77e is described below

commit 63ca77ef67ce8beadb90047103d0920660fa6e39
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Feb 11 15:01:08 2022 -0800

    KAFKA-13661; Consistent permissions in KRaft for CreatePartitions API (#11745)
    
    This patch fixes two permission inconsistencies between kraft and zk authorization for the `CreatePartitions` request. Previously kraft was requiring `CREATE` permission on the `Topic` resource when it should have required `ALTER`. Additionally, kraft allow `CREATE` on the `Cluster` resource, which is not supported in zk clusters and was not documented in KIP-195: https://cwiki.apache.org/confluence/display/KAFKA/KIP-195%3A+AdminClient.createPartitions. This patch fixes these issues a [...]
    
    Reviewers: José Armando García Sancio <js...@gmail.com>
---
 .../scala/kafka/controller/KafkaController.scala   |  6 +-
 .../main/scala/kafka/server/ControllerApis.scala   | 28 ++++-----
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../main/scala/kafka/server/ZkAdminManager.scala   |  6 +-
 .../unit/kafka/server/ControllerApisTest.scala     | 40 ++++++++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 68 ++++++++++++++++++++++
 6 files changed, 130 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 9afefe3..46ce379 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -105,7 +105,7 @@ class KafkaController(val config: KafkaConfig,
     new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
   val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
     new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
-  val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
+  private val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
     partitionStateMachine, new ControllerDeletionClient(this, zkClient))
 
   private val controllerChangeHandler = new ControllerChangeHandler(eventManager)
@@ -218,6 +218,10 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def isTopicQueuedForDeletion(topic: String): Boolean = {
+    topicDeletionManager.isTopicQueuedUpForDeletion(topic)
+  }
+
   private def state: ControllerState = eventManager.state
 
   /**
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index ed9b55a..acfb700 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -683,9 +683,15 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
 
   def handleCreatePartitions(request: RequestChannel.Request): Unit = {
-    val future = createPartitions(request.body[CreatePartitionsRequest].data,
-      authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
-      names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(n => n))
+    def filterAlterAuthorizedTopics(topics: Iterable[String]): Set[String] = {
+      authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
+    }
+
+    val future = createPartitions(
+      request.body[CreatePartitionsRequest].data,
+      filterAlterAuthorizedTopics
+    )
+
     future.whenComplete { (responses, exception) =>
       if (exception != null) {
         requestHelper.handleError(request, exception)
@@ -700,10 +706,10 @@ class ControllerApis(val requestChannel: RequestChannel,
     }
   }
 
-  def createPartitions(request: CreatePartitionsRequestData,
-                       hasClusterAuth: Boolean,
-                       getCreatableTopics: Iterable[String] => Set[String])
-                       : CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
+  def createPartitions(
+    request: CreatePartitionsRequestData,
+    getAlterAuthorizedTopics: Iterable[String] => Set[String]
+  ): CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
     val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS);
     val responses = new util.ArrayList[CreatePartitionsTopicResult]()
     val duplicateTopicNames = new util.HashSet[String]()
@@ -721,13 +727,7 @@ class ControllerApis(val requestChannel: RequestChannel,
         setErrorMessage("Duplicate topic name."))
         topicNames.remove(topicName)
     }
-    val authorizedTopicNames = {
-      if (hasClusterAuth) {
-        topicNames.asScala
-      } else {
-        getCreatableTopics(topicNames.asScala)
-      }
-    }
+    val authorizedTopicNames = getAlterAuthorizedTopics(topicNames.asScala)
     val topics = new util.ArrayList[CreatePartitionsTopic]
     topicNames.forEach { topicName =>
       if (authorizedTopicNames.contains(topicName)) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a4b38be..90a28e7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1994,7 +1994,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         notDuped)(_.name)
 
       val (queuedForDeletion, valid) = authorized.partition { topic =>
-        zkSupport.controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic.name)
+        zkSupport.controller.isTopicQueuedForDeletion(topic.name)
       }
 
       val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index d2e7456..3ca05cf 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -292,7 +292,7 @@ class ZkAdminManager(val config: KafkaConfig,
     }
   }
 
-  def createPartitions(timeout: Int,
+  def createPartitions(timeoutMs: Int,
                        newPartitions: Seq[CreatePartitionsTopic],
                        validateOnly: Boolean,
                        controllerMutationQuota: ControllerMutationQuota,
@@ -369,7 +369,7 @@ class ZkAdminManager(val config: KafkaConfig,
     }
 
     // 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
-    if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
+    if (timeoutMs <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
       val results = metadata.map { createPartitionMetadata =>
         // ignore topics that already have errors
         if (createPartitionMetadata.error.isSuccess && !validateOnly) {
@@ -381,7 +381,7 @@ class ZkAdminManager(val config: KafkaConfig,
       callback(results)
     } else {
       // 3. else pass the assignments and errors to the delayed operation and set the keys
-      val delayedCreate = new DelayedCreatePartitions(timeout, metadata, this, callback)
+      val delayedCreate = new DelayedCreatePartitions(timeoutMs, metadata, this, callback)
       val delayedCreateKeys = newPartitions.map(createPartitionTopic => TopicKey(createPartitionTopic.name))
       // try to complete the request immediately, otherwise put it into the purgatory
       topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 2176d23..479bf8e 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -727,7 +727,45 @@ class ControllerApisTest {
       new CreatePartitionsTopicResult().setName("baz").
         setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
         setErrorMessage(null)),
-      controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet)
+      controllerApis.createPartitions(request, _ => Set("foo", "bar")).get().asScala.toSet)
+  }
+
+  @Test
+  def testCreatePartitionsAuthorization(): Unit = {
+    val controller = new MockController.Builder()
+      .newInitialTopic("foo", Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"))
+      .build()
+    val authorizer = mock(classOf[Authorizer])
+    val controllerApis = createControllerApis(Some(authorizer), controller)
+
+    val requestData = new CreatePartitionsRequestData()
+    requestData.topics().add(new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2))
+    requestData.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10))
+    val request = new CreatePartitionsRequest.Builder(requestData).build()
+
+    val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL)
+    val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true)
+
+    val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", PatternType.LITERAL)
+    val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true)
+
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument[util.List[Action]](1).asScala
+      val results = actions.map { action =>
+        if (action == fooAction) AuthorizationResult.ALLOWED
+        else if (action == barAction) AuthorizationResult.DENIED
+        else throw new AssertionError(s"Unexpected action $action")
+      }
+      new util.ArrayList[AuthorizationResult](results.asJava)
+    }
+
+    val response = handleRequest[CreatePartitionsResponse](request, controllerApis)
+    val results = response.data.results.asScala
+    assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result => Errors.forCode(result.errorCode)))
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name == "bar").map(result => Errors.forCode(result.errorCode)))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 98c2666..351cad6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -83,6 +83,8 @@ import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
 import java.util.Arrays
 
+import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
+
 class KafkaApisTest {
 
   private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
@@ -796,6 +798,72 @@ class KafkaApisTest {
     testForwardableApi(ApiKeys.CREATE_TOPICS, requestBuilder)
   }
 
+  @Test
+  def testCreatePartitionsAuthorization(): Unit = {
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+
+    val timeoutMs = 35000
+    val requestData = new CreatePartitionsRequestData()
+      .setTimeoutMs(timeoutMs)
+      .setValidateOnly(false)
+    val fooCreatePartitionsData = new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2)
+    val barCreatePartitionsData = new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10)
+    requestData.topics().add(fooCreatePartitionsData)
+    requestData.topics().add(barCreatePartitionsData)
+
+    val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL)
+    val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true)
+
+    val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", PatternType.LITERAL)
+    val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true)
+
+    val actionsCapture = EasyMock.newCapture[util.List[Action]]()
+    EasyMock.expect(authorizer.authorize(
+      anyObject[RequestContext],
+      EasyMock.capture(actionsCapture)
+    )).andAnswer(() => {
+      val actions = actionsCapture.getValue.asScala
+      val results = actions.map { action =>
+        if (action == fooAction) AuthorizationResult.ALLOWED
+        else if (action == barAction) AuthorizationResult.DENIED
+        else throw new AssertionError(s"Unexpected action $action")
+      }
+      new util.ArrayList[AuthorizationResult](results.asJava)
+    })
+
+    val request = buildRequest(new CreatePartitionsRequest.Builder(requestData).build())
+    val capturedResponse = expectNoThrottling(request)
+
+    EasyMock.expect(controller.isActive).andReturn(true)
+    EasyMock.expect(controller.isTopicQueuedForDeletion("foo")).andReturn(false)
+
+    EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+      EasyMock.eq(request), EasyMock.anyShort())
+    ).andReturn(UnboundedControllerMutationQuota)
+
+    val callbackCapture = EasyMock.newCapture[Map[String, ApiError] => Unit]()
+    EasyMock.expect(adminManager.createPartitions(
+      timeoutMs = EasyMock.eq(timeoutMs),
+      newPartitions = EasyMock.eq(Seq(fooCreatePartitionsData)),
+      validateOnly = EasyMock.eq(false),
+      controllerMutationQuota = EasyMock.eq(UnboundedControllerMutationQuota),
+      callback = EasyMock.capture(callbackCapture)
+    )).andAnswer(() => {
+      val callback = callbackCapture.getValue
+      callback.apply(Map("foo" -> ApiError.NONE))
+    })
+
+    EasyMock.replay(authorizer, adminManager, replicaManager, clientRequestQuotaManager,
+      requestChannel, controller, clientControllerQuotaManager)
+    kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
+
+    val response = capturedResponse.getValue.asInstanceOf[CreatePartitionsResponse]
+    val results = response.data.results.asScala
+    assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result => Errors.forCode(result.errorCode)))
+    assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name == "bar").map(result => Errors.forCode(result.errorCode)))
+  }
+
   private def createTopicAuthorization(authorizer: Authorizer,
                                        operation: AclOperation,
                                        authorizedTopic: String,