You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/10/06 20:15:33 UTC
kafka git commit: KAFKA-5547;
Return TOPIC_AUTHORIZATION_FAILED error if no describe access for
topics
Repository: kafka
Updated Branches:
refs/heads/trunk fb6ca658d -> 10cd98cc8
KAFKA-5547; Return TOPIC_AUTHORIZATION_FAILED error if no describe access for topics
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Rajini Sivaram <ra...@googlemail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #3924 from omkreddy/KAFKA-5547-TOPIC-AUTHRO
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10cd98cc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10cd98cc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10cd98cc
Branch: refs/heads/trunk
Commit: 10cd98cc894b88c5d1e24fc54c66361ad9914df2
Parents: fb6ca65
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Fri Oct 6 12:51:30 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Oct 6 12:51:30 2017 -0700
----------------------------------------------------------------------
.../consumer/internals/ConsumerCoordinator.java | 5 +-
.../common/requests/OffsetFetchResponse.java | 2 +
.../src/main/scala/kafka/server/KafkaApis.scala | 198 ++++++++++---------
.../kafka/api/AuthorizerIntegrationTest.scala | 53 ++---
.../kafka/api/EndToEndAuthorizationTest.scala | 4 +-
.../api/SaslEndToEndAuthorizationTest.scala | 6 +-
docs/upgrade.html | 4 +
7 files changed, 145 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 38ca041..5482db7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -786,7 +786,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
future.raise(new CommitFailedException());
return;
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
- future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
+ future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
return;
} else {
future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
@@ -857,8 +857,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
- future.raise(new KafkaException("Partition " + tp + " may not exist or the user may not have " +
- "Describe access to the topic"));
+ future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 4d069fe..e398442 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -85,6 +85,8 @@ public class OffsetFetchResponse extends AbstractResponse {
public static final String NO_METADATA = "";
public static final PartitionData UNKNOWN_PARTITION = new PartitionData(INVALID_OFFSET, NO_METADATA,
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+ public static final PartitionData UNAUTHORIZED_PARTITION = new PartitionData(INVALID_OFFSET, NO_METADATA,
+ Errors.TOPIC_AUTHORIZATION_FAILED);
/**
* Possible error codes:
http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c171aaa..aa00565 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -264,25 +264,25 @@ class KafkaApis(val requestChannel: RequestChannel,
}.toMap
sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava))
} else {
- val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
- case (topicPartition, _) =>
- val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
- val exists = metadataCache.contains(topicPartition.topic)
- if (!authorizedForDescribe && exists)
- debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
- s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION")
- authorizedForDescribe && exists
- }
- val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition {
- case (topicPartition, _) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
+ var unauthorizedTopics = Set[TopicPartition]()
+ var nonExistingTopics = Set[TopicPartition]()
+ var authorizedTopics = mutable.Map[TopicPartition, OffsetCommitRequest.PartitionData]()
+
+ for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala.toMap) {
+ if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
+ unauthorizedTopics += topicPartition
+ else if (!metadataCache.contains(topicPartition.topic))
+ nonExistingTopics += topicPartition
+ else
+ authorizedTopics += (topicPartition -> partitionData)
}
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) {
val combinedCommitStatus = commitStatus ++
- unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++
- nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
+ nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
if (isDebugEnabled)
combinedCommitStatus.foreach { case (topicPartition, error) =>
@@ -313,7 +313,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case e: Throwable => (topicPartition, Errors.forException(e))
}
}
- sendResponseCallback(responseInfo)
+ sendResponseCallback(responseInfo.toMap)
} else {
// for version 1 and beyond store offsets in offset manager
@@ -353,7 +353,7 @@ class KafkaApis(val requestChannel: RequestChannel,
offsetCommitRequest.groupId,
offsetCommitRequest.memberId,
offsetCommitRequest.generationId,
- partitionData,
+ partitionData.toMap,
sendResponseCallback)
}
}
@@ -381,21 +381,25 @@ class KafkaApis(val requestChannel: RequestChannel,
return
}
- val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
- produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
- authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
- }
+ var unauthorizedTopics = Set[TopicPartition]()
+ var nonExistingTopics = Set[TopicPartition]()
+ var authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
- val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
- case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic))
+ for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
+ if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
+ unauthorizedTopics += topicPartition
+ else if (!metadataCache.contains(topicPartition.topic))
+ nonExistingTopics += topicPartition
+ else
+ authorizedRequestInfo += (topicPartition -> memoryRecords)
}
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
val mergedResponseStatus = responseStatus ++
- unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
- nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
+ unauthorizedTopics.map(_ -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
+ nonExistingTopics.map(_ -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
var errorInResponse = false
@@ -479,21 +483,26 @@ class KafkaApis(val requestChannel: RequestChannel,
val versionId = request.header.apiVersion
val clientId = request.header.clientId
- val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.fetchData.asScala.toSeq.partition {
- case (tp, _) => authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
- }
+ var unauthorizedTopics = Set[TopicPartition]()
+ var nonExistingTopics = Set[TopicPartition]()
+ var authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
- val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
- case (tp, _) => authorize(request.session, Read, new Resource(Topic, tp.topic))
+ for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) {
+ if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
+ unauthorizedTopics += topicPartition
+ else if (!metadataCache.contains(topicPartition.topic))
+ nonExistingTopics += topicPartition
+ else
+ authorizedRequestInfo += (topicPartition -> partitionData)
}
- val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
- case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ val nonExistingPartitionData = nonExistingTopics.map {
+ case tp => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
}
- val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
- case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+ val unauthorizedForReadPartitionData = unauthorizedTopics.map {
+ case tp => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
}
@@ -538,7 +547,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData
+ val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingPartitionData
val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
@@ -644,7 +653,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
- new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, List[JLong]().asJava)
+ new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, List[JLong]().asJava)
)
val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) =>
@@ -697,7 +706,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
- new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET)
})
@@ -957,7 +966,7 @@ class KafkaApis(val requestChannel: RequestChannel,
Set.empty[MetadataResponse.TopicMetadata]
else
unauthorizedForDescribeTopics.map(topic =>
- new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()))
+ new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, java.util.Collections.emptyList()))
// In version 0, we returned an error when brokers with replicas were unavailable,
// while in higher versions we simply don't include the broker in the returned broker list
@@ -1029,7 +1038,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}.toMap
- val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+ val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
} else {
// versions 1 and above read offsets from Kafka
@@ -1050,7 +1059,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (error != Errors.NONE)
offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
else {
- val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+ val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
}
}
@@ -1370,11 +1379,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val (queuedForDeletion, valid) = authorized.partition { case (topic, _) =>
controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic)
-
}
val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++
- unauthorized.keySet.map( topic => topic -> createPartitionsAuthorizationApiError(request.session, topic) ) ++
+ unauthorized.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "The topic authorization is failed.")) ++
queuedForDeletion.keySet.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion."))
adminManager.createPartitions(createPartitionsRequest.timeout, valid, createPartitionsRequest.validateOnly,
@@ -1382,28 +1390,26 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- private def createPartitionsAuthorizationApiError(session: RequestChannel.Session, topic: String): ApiError = {
- if (authorize(session, Describe, new Resource(Topic, topic)))
- new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)
- else
- new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null)
- }
-
def handleDeleteTopicsRequest(request: RequestChannel.Request) {
val deleteTopicRequest = request.body[DeleteTopicsRequest]
- val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition { topic =>
- authorize(request.session, Describe, new Resource(Topic, topic)) && metadataCache.contains(topic)
- }
+ var unauthorizedTopics = Set[String]()
+ var nonExistingTopics = Set[String]()
+ var authorizedForDeleteTopics = Set[String]()
- val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition { topic =>
- authorize(request.session, Delete, new Resource(Topic, topic))
+ for (topic <- deleteTopicRequest.topics.asScala) {
+ if (!authorize(request.session, Delete, new Resource(Topic, topic)))
+ unauthorizedTopics += topic
+ else if (!metadataCache.contains(topic))
+ nonExistingTopics += topic
+ else
+ authorizedForDeleteTopics += topic
}
def sendResponseCallback(results: Map[String, Errors]): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
- val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
- unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
+ val completeResults = unauthorizedTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++
+ nonExistingTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ results
val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava)
trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
responseBody
@@ -1418,12 +1424,12 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseCallback(results)
} else {
// If no authorized topics return immediately
- if (authorizedTopics.isEmpty)
+ if (authorizedForDeleteTopics.isEmpty)
sendResponseCallback(Map())
else {
adminManager.deleteTopics(
deleteTopicRequest.timeout.toInt,
- authorizedTopics,
+ authorizedForDeleteTopics,
sendResponseCallback
)
}
@@ -1433,21 +1439,26 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDeleteRecordsRequest(request: RequestChannel.Request) {
val deleteRecordsRequest = request.body[DeleteRecordsRequest]
- val (authorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteRecordsRequest.partitionOffsets.asScala.partition {
- case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
- }
+ var unauthorizedTopics = Set[TopicPartition]()
+ var nonExistingTopics = Set[TopicPartition]()
+ var authorizedForDeleteTopics = mutable.Map[TopicPartition, Long]()
- val (authorizedForDeleteTopics, unauthorizedForDeleteTopics) = authorizedForDescribeTopics.partition {
- case (topicPartition, _) => authorize(request.session, Delete, new Resource(Topic, topicPartition.topic))
+ for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) {
+ if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic)))
+ unauthorizedTopics += topicPartition
+ else if (!metadataCache.contains(topicPartition.topic))
+ nonExistingTopics += topicPartition
+ else
+ authorizedForDeleteTopics += (topicPartition -> offset)
}
// the callback for sending a DeleteRecordsResponse
def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
val mergedResponseStatus = responseStatus ++
- unauthorizedForDeleteTopics.mapValues(_ =>
+ unauthorizedTopics.map(_ ->
new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++
- nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
+ nonExistingTopics.map(_ ->
new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION))
mergedResponseStatus.foreach { case (topicPartition, status) =>
@@ -1646,24 +1657,28 @@ class KafkaApis(val requestChannel: RequestChannel,
else {
val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())}
- val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
- partitionsToAdd.asScala.partition { tp =>
- authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp)
- }
+ var unauthorizedTopics = Set[TopicPartition]()
+ var nonExistingTopics = Set[TopicPartition]()
+ var authorizedPartitions = Set[TopicPartition]()
- val (authorizedPartitions, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { tp =>
- authorize(request.session, Write, new Resource(Topic, tp.topic))
+ for ( topicPartition <- partitionsToAdd.asScala) {
+ if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
+ unauthorizedTopics += topicPartition
+ else if (!metadataCache.contains(topicPartition.topic))
+ nonExistingTopics += topicPartition
+ else
+ authorizedPartitions += topicPartition
}
- if (nonExistingOrUnauthorizedForDescribeTopics.nonEmpty
- || unauthorizedForWriteRequestInfo.nonEmpty
+ if (unauthorizedTopics.nonEmpty
+ || nonExistingTopics.nonEmpty
|| internalTopics.nonEmpty) {
// Any failed partition check causes the entire request to fail. We send the appropriate error codes for the
// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded
// the authorization check to indicate that they were not added to the transaction.
- val partitionErrors = (unauthorizedForWriteRequestInfo.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
- nonExistingOrUnauthorizedForDescribeTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++
+ val partitionErrors = (unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
+ nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++
internalTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)).toMap
@@ -1734,26 +1749,24 @@ class KafkaApis(val requestChannel: RequestChannel,
else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId)))
sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
else {
- val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = txnOffsetCommitRequest.offsets.asScala.toMap.partition {
- case (topicPartition, _) =>
- val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
- val exists = metadataCache.contains(topicPartition.topic)
- if (!authorizedForDescribe && exists)
- debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " +
- s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning " +
- s"${Errors.UNKNOWN_TOPIC_OR_PARTITION.name}")
- authorizedForDescribe && exists
- }
-
- val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition {
- case (topicPartition, _) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
+ var unauthorizedTopics = Set[TopicPartition]()
+ var nonExistingTopics = Set[TopicPartition]()
+ var authorizedTopics = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
+
+ for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
+ if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
+ unauthorizedTopics += topicPartition
+ else if (!metadataCache.contains(topicPartition.topic))
+ nonExistingTopics += topicPartition
+ else
+ authorizedTopics += (topicPartition -> commitedOffset)
}
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) {
val combinedCommitStatus = commitStatus ++
- unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++
- nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
+ nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
if (isDebugEnabled)
combinedCommitStatus.foreach { case (topicPartition, error) =>
@@ -1769,7 +1782,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedTopics.isEmpty)
sendResponseCallback(Map.empty)
else {
- val offsetMetadata = convertTxnOffsets(authorizedTopics)
+ val offsetMetadata = convertTxnOffsets(authorizedTopics.toMap)
groupCoordinator.handleTxnCommitOffsets(
txnOffsetCommitRequest.consumerGroupId,
txnOffsetCommitRequest.producerId,
@@ -1942,12 +1955,7 @@ class KafkaApis(val requestChannel: RequestChannel,
private def configsAuthorizationApiError(session: RequestChannel.Session, resource: RResource): ApiError = {
val error = resource.`type` match {
case RResourceType.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED
- case RResourceType.TOPIC =>
- // Don't leak topic name unless the user has describe topic permission
- if (authorize(session, Describe, new Resource(Topic, resource.name)))
- Errors.TOPIC_AUTHORIZATION_FAILED
- else
- Errors.UNKNOWN_TOPIC_OR_PARTITION
+ case RResourceType.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
}
new ApiError(error, null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d07d08e..522fcd3 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -382,6 +382,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(Collections.singleton(tp)).build()
+ private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build()
+
+ private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
+
@Test
def testAuthorizationWithTopicExisting() {
@@ -413,26 +417,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_ACLS -> describeAclsRequest,
ApiKeys.ALTER_REPLICA_LOG_DIRS -> alterReplicaLogDirsRequest,
ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest,
- ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
+ ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
+ ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
+ ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest
)
for ((key, request) <- requestKeyToRequest) {
removeAllAcls()
val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
- sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
val resourceToAcls = requestKeysToAcls(key)
resourceToAcls.get(topicResource).foreach { acls =>
val describeAcls = topicDescribeAcl(topicResource)
val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
- sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized)
removeAllAcls()
}
for ((resource, acls) <- resourceToAcls)
addAndVerifyAcls(acls, resource)
- sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
}
}
@@ -447,7 +453,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
- ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false),
ApiKeys.PRODUCE -> createProduceRequest,
ApiKeys.FETCH -> createFetchRequest,
@@ -455,26 +460,29 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
- ApiKeys.DELETE_RECORDS -> deleteRecordsRequest
+ ApiKeys.DELETE_RECORDS -> deleteRecordsRequest,
+ ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
+ ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
+ ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
)
for ((key, request) <- requestKeyToRequest) {
removeAllAcls()
val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
- sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, topicExists = false)
val resourceToAcls = requestKeysToAcls(key)
resourceToAcls.get(topicResource).foreach { acls =>
val describeAcls = topicDescribeAcl(topicResource)
val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
- sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, topicExists = false)
removeAllAcls()
}
for ((resource, acls) <- resourceToAcls)
addAndVerifyAcls(acls, resource)
- sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false, topicExists = false)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, topicExists = false)
}
}
@@ -484,7 +492,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
sendRecords(numRecords, tp)
fail("should have thrown exception")
} catch {
- case _: TimeoutException => //expected
+ case _: TopicAuthorizationException => //expected
}
}
@@ -534,8 +542,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
sendRecords(numRecords, topicPartition)
}
- @Test(expected = classOf[GroupAuthorizationException])
- def testConsumeWithNoAccess(): Unit = {
+ @Test(expected = classOf[TopicAuthorizationException])
+ def testConsumeUsingAssignWithNoAccess(): Unit = {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
@@ -893,10 +901,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
this.consumers.head.position(tp)
}
- @Test
+ @Test(expected = classOf[TopicAuthorizationException])
def testListOffsetsWithNoTopicAccess() {
- val partitionInfos = this.consumers.head.partitionsFor(topic)
- assertNull(partitionInfos)
+ this.consumers.head.partitionsFor(topic)
}
@Test
@@ -935,7 +942,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
val version = ApiKeys.DELETE_TOPICS.latestVersion
val deleteResponse = DeleteTopicsResponse.parse(response, version)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteResponse.errors.asScala.head._2)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteResponse.errors.asScala.head._2)
}
@Test
@@ -963,7 +970,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
val version = ApiKeys.DELETE_RECORDS.latestVersion
val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteRecordsResponse.responses.asScala.head._2.error)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteRecordsResponse.responses.asScala.head._2.error)
}
@Test
@@ -990,7 +997,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
val version = ApiKeys.CREATE_PARTITIONS.latestVersion
val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, createPartitionsResponse.errors.asScala.head._2.error)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, createPartitionsResponse.errors.asScala.head._2.error)
}
@Test
@@ -1240,7 +1247,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
request: AbstractRequest,
resources: Set[ResourceType],
isAuthorized: Boolean,
- isAuthorizedTopicDescribe: Boolean,
topicExists: Boolean = true): AbstractResponse = {
val resp = connectAndSend(request, apiKey)
val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke(
@@ -1251,8 +1257,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
if (resourceType == Topic) {
if (isAuthorized)
Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.error)
- else if (!isAuthorizedTopicDescribe)
- Set(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
Set(Topic.error)
} else {
@@ -1266,9 +1270,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
else
assertTrue(s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors", authorizationErrors.contains(error))
else if (resources == Set(Topic))
- assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
- else
- assertNotEquals(s"$apiKey had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error)
+ if (isAuthorized)
+ assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
+ else
+ assertEquals(s"$apiKey had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error)
response
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index bbb3249..720d8b6 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -214,7 +214,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* Tests that a producer fails to publish messages when the appropriate ACL
* isn't set.
*/
- @Test(expected = classOf[TimeoutException])
+ @Test(expected = classOf[TopicAuthorizationException])
def testNoProduceWithoutDescribeAcl(): Unit = {
sendRecords(numRecords, tp)
}
@@ -246,7 +246,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
consumeRecords(this.consumers.head)
}
- @Test(expected = classOf[TimeoutException])
+ @Test(expected = classOf[TopicAuthorizationException])
def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
noConsumeWithoutDescribeAclSetup()
consumers.head.subscribe(List(topic).asJava)
http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index a366b1d..fb6bee8 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -21,8 +21,8 @@ import java.util.Properties
import kafka.utils.TestUtils
import kafka.utils.Implicits._
import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.errors.GroupAuthorizationException
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.errors.TopicAuthorizationException
import org.junit.{Before, Test}
import scala.collection.immutable.List
@@ -77,9 +77,9 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
try {
consumeRecords(consumer2)
- fail("Expected exception as consumer2 has no access to group")
+ fail("Expected exception as consumer2 has no access to topic")
} catch {
- case _: GroupAuthorizationException => //expected
+ case _: TopicAuthorizationException => //expected
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 5872c7c..862dadb 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -86,6 +86,10 @@
inversion bug, it was previously enabled by default and disabled if <code>kafka_mx4jenable</code> was set to <code>true</code>.</li>
<li>The package <code>org.apache.kafka.common.security.auth</code> in the clients jar has been made public and added to the javadocs.
Internal classes which had previously been located in this package have been moved elsewhere.</li>
+ <li>When using an Authorizer and a user doesn't have required permissions on a topic, the broker
+ will return TOPIC_AUTHORIZATION_FAILED errors to requests irrespective of topic existence on broker.
+ If the user have required permissions and the topic doesn't exists, then the UNKNOWN_TOPIC_OR_PARTITION
+ error code will be returned. </li>
</ul>
<h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5>