You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/02/11 23:24:37 UTC
[kafka] branch 3.1 updated: KAFKA-13661; Consistent permissions in KRaft for CreatePartitions API (#11745)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 63ca77e KAFKA-13661; Consistent permissions in KRaft for CreatePartitions API (#11745)
63ca77e is described below
commit 63ca77ef67ce8beadb90047103d0920660fa6e39
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Feb 11 15:01:08 2022 -0800
KAFKA-13661; Consistent permissions in KRaft for CreatePartitions API (#11745)
This patch fixes two permission inconsistencies between kraft and zk authorization for the `CreatePartitions` request. Previously kraft was requiring `CREATE` permission on the `Topic` resource when it should have required `ALTER`. Additionally, kraft allow `CREATE` on the `Cluster` resource, which is not supported in zk clusters and was not documented in KIP-195: https://cwiki.apache.org/confluence/display/KAFKA/KIP-195%3A+AdminClient.createPartitions. This patch fixes these issues a [...]
Reviewers: José Armando García Sancio <js...@gmail.com>
---
.../scala/kafka/controller/KafkaController.scala | 6 +-
.../main/scala/kafka/server/ControllerApis.scala | 28 ++++-----
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../main/scala/kafka/server/ZkAdminManager.scala | 6 +-
.../unit/kafka/server/ControllerApisTest.scala | 40 ++++++++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 68 ++++++++++++++++++++++
6 files changed, 130 insertions(+), 20 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 9afefe3..46ce379 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -105,7 +105,7 @@ class KafkaController(val config: KafkaConfig,
new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
- val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
+ private val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
partitionStateMachine, new ControllerDeletionClient(this, zkClient))
private val controllerChangeHandler = new ControllerChangeHandler(eventManager)
@@ -218,6 +218,10 @@ class KafkaController(val config: KafkaConfig,
}
}
+ def isTopicQueuedForDeletion(topic: String): Boolean = {
+ topicDeletionManager.isTopicQueuedUpForDeletion(topic)
+ }
+
private def state: ControllerState = eventManager.state
/**
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index ed9b55a..acfb700 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -683,9 +683,15 @@ class ControllerApis(val requestChannel: RequestChannel,
}
def handleCreatePartitions(request: RequestChannel.Request): Unit = {
- val future = createPartitions(request.body[CreatePartitionsRequest].data,
- authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
- names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(n => n))
+ def filterAlterAuthorizedTopics(topics: Iterable[String]): Set[String] = {
+ authHelper.filterByAuthorized(request.context, ALTER, TOPIC, topics)(n => n)
+ }
+
+ val future = createPartitions(
+ request.body[CreatePartitionsRequest].data,
+ filterAlterAuthorizedTopics
+ )
+
future.whenComplete { (responses, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
@@ -700,10 +706,10 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
- def createPartitions(request: CreatePartitionsRequestData,
- hasClusterAuth: Boolean,
- getCreatableTopics: Iterable[String] => Set[String])
- : CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
+ def createPartitions(
+ request: CreatePartitionsRequestData,
+ getAlterAuthorizedTopics: Iterable[String] => Set[String]
+ ): CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
val deadlineNs = time.nanoseconds() + NANOSECONDS.convert(request.timeoutMs, MILLISECONDS);
val responses = new util.ArrayList[CreatePartitionsTopicResult]()
val duplicateTopicNames = new util.HashSet[String]()
@@ -721,13 +727,7 @@ class ControllerApis(val requestChannel: RequestChannel,
setErrorMessage("Duplicate topic name."))
topicNames.remove(topicName)
}
- val authorizedTopicNames = {
- if (hasClusterAuth) {
- topicNames.asScala
- } else {
- getCreatableTopics(topicNames.asScala)
- }
- }
+ val authorizedTopicNames = getAlterAuthorizedTopics(topicNames.asScala)
val topics = new util.ArrayList[CreatePartitionsTopic]
topicNames.forEach { topicName =>
if (authorizedTopicNames.contains(topicName)) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index a4b38be..90a28e7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1994,7 +1994,7 @@ class KafkaApis(val requestChannel: RequestChannel,
notDuped)(_.name)
val (queuedForDeletion, valid) = authorized.partition { topic =>
- zkSupport.controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic.name)
+ zkSupport.controller.isTopicQueuedForDeletion(topic.name)
}
val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index d2e7456..3ca05cf 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -292,7 +292,7 @@ class ZkAdminManager(val config: KafkaConfig,
}
}
- def createPartitions(timeout: Int,
+ def createPartitions(timeoutMs: Int,
newPartitions: Seq[CreatePartitionsTopic],
validateOnly: Boolean,
controllerMutationQuota: ControllerMutationQuota,
@@ -369,7 +369,7 @@ class ZkAdminManager(val config: KafkaConfig,
}
// 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
- if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
+ if (timeoutMs <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
val results = metadata.map { createPartitionMetadata =>
// ignore topics that already have errors
if (createPartitionMetadata.error.isSuccess && !validateOnly) {
@@ -381,7 +381,7 @@ class ZkAdminManager(val config: KafkaConfig,
callback(results)
} else {
// 3. else pass the assignments and errors to the delayed operation and set the keys
- val delayedCreate = new DelayedCreatePartitions(timeout, metadata, this, callback)
+ val delayedCreate = new DelayedCreatePartitions(timeoutMs, metadata, this, callback)
val delayedCreateKeys = newPartitions.map(createPartitionTopic => TopicKey(createPartitionTopic.name))
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 2176d23..479bf8e 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -727,7 +727,45 @@ class ControllerApisTest {
new CreatePartitionsTopicResult().setName("baz").
setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
setErrorMessage(null)),
- controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet)
+ controllerApis.createPartitions(request, _ => Set("foo", "bar")).get().asScala.toSet)
+ }
+
+ @Test
+ def testCreatePartitionsAuthorization(): Unit = {
+ val controller = new MockController.Builder()
+ .newInitialTopic("foo", Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q"))
+ .build()
+ val authorizer = mock(classOf[Authorizer])
+ val controllerApis = createControllerApis(Some(authorizer), controller)
+
+ val requestData = new CreatePartitionsRequestData()
+ requestData.topics().add(new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2))
+ requestData.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10))
+ val request = new CreatePartitionsRequest.Builder(requestData).build()
+
+ val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL)
+ val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true)
+
+ val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", PatternType.LITERAL)
+ val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true)
+
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument[util.List[Action]](1).asScala
+ val results = actions.map { action =>
+ if (action == fooAction) AuthorizationResult.ALLOWED
+ else if (action == barAction) AuthorizationResult.DENIED
+ else throw new AssertionError(s"Unexpected action $action")
+ }
+ new util.ArrayList[AuthorizationResult](results.asJava)
+ }
+
+ val response = handleRequest[CreatePartitionsResponse](request, controllerApis)
+ val results = response.data.results.asScala
+ assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result => Errors.forCode(result.errorCode)))
+ assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name == "bar").map(result => Errors.forCode(result.errorCode)))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 98c2666..351cad6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -83,6 +83,8 @@ import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
import java.util.Arrays
+import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
+
class KafkaApisTest {
private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
@@ -796,6 +798,72 @@ class KafkaApisTest {
testForwardableApi(ApiKeys.CREATE_TOPICS, requestBuilder)
}
+ @Test
+ def testCreatePartitionsAuthorization(): Unit = {
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+
+ val timeoutMs = 35000
+ val requestData = new CreatePartitionsRequestData()
+ .setTimeoutMs(timeoutMs)
+ .setValidateOnly(false)
+ val fooCreatePartitionsData = new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(2)
+ val barCreatePartitionsData = new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(10)
+ requestData.topics().add(fooCreatePartitionsData)
+ requestData.topics().add(barCreatePartitionsData)
+
+ val fooResource = new ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL)
+ val fooAction = new Action(AclOperation.ALTER, fooResource, 1, true, true)
+
+ val barResource = new ResourcePattern(ResourceType.TOPIC, "bar", PatternType.LITERAL)
+ val barAction = new Action(AclOperation.ALTER, barResource, 1, true, true)
+
+ val actionsCapture = EasyMock.newCapture[util.List[Action]]()
+ EasyMock.expect(authorizer.authorize(
+ anyObject[RequestContext],
+ EasyMock.capture(actionsCapture)
+ )).andAnswer(() => {
+ val actions = actionsCapture.getValue.asScala
+ val results = actions.map { action =>
+ if (action == fooAction) AuthorizationResult.ALLOWED
+ else if (action == barAction) AuthorizationResult.DENIED
+ else throw new AssertionError(s"Unexpected action $action")
+ }
+ new util.ArrayList[AuthorizationResult](results.asJava)
+ })
+
+ val request = buildRequest(new CreatePartitionsRequest.Builder(requestData).build())
+ val capturedResponse = expectNoThrottling(request)
+
+ EasyMock.expect(controller.isActive).andReturn(true)
+ EasyMock.expect(controller.isTopicQueuedForDeletion("foo")).andReturn(false)
+
+ EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+ EasyMock.eq(request), EasyMock.anyShort())
+ ).andReturn(UnboundedControllerMutationQuota)
+
+ val callbackCapture = EasyMock.newCapture[Map[String, ApiError] => Unit]()
+ EasyMock.expect(adminManager.createPartitions(
+ timeoutMs = EasyMock.eq(timeoutMs),
+ newPartitions = EasyMock.eq(Seq(fooCreatePartitionsData)),
+ validateOnly = EasyMock.eq(false),
+ controllerMutationQuota = EasyMock.eq(UnboundedControllerMutationQuota),
+ callback = EasyMock.capture(callbackCapture)
+ )).andAnswer(() => {
+ val callback = callbackCapture.getValue
+ callback.apply(Map("foo" -> ApiError.NONE))
+ })
+
+ EasyMock.replay(authorizer, adminManager, replicaManager, clientRequestQuotaManager,
+ requestChannel, controller, clientControllerQuotaManager)
+ kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
+
+ val response = capturedResponse.getValue.asInstanceOf[CreatePartitionsResponse]
+ val results = response.data.results.asScala
+ assertEquals(Some(Errors.NONE), results.find(_.name == "foo").map(result => Errors.forCode(result.errorCode)))
+ assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), results.find(_.name == "bar").map(result => Errors.forCode(result.errorCode)))
+ }
+
private def createTopicAuthorization(authorizer: Authorizer,
operation: AclOperation,
authorizedTopic: String,