You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/10/01 06:08:44 UTC
kafka git commit: KAFKA-3396;
Ensure Describe access is required to detect topic existence
Repository: kafka
Updated Branches:
refs/heads/trunk f8b69aacd -> 8124f6e09
KAFKA-3396; Ensure Describe access is required to detect topic existence
Reopening of https://github.com/apache/kafka/pull/1428
Author: Edoardo Comar <ec...@uk.ibm.com>
Author: Mickael Maison <mi...@gmail.com>
Reviewers: Grant Henke <gr...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #1908 from edoardocomar/KAFKA-3396
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8124f6e0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8124f6e0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8124f6e0
Branch: refs/heads/trunk
Commit: 8124f6e0996cb673760750b3aba004ae11e34c6a
Parents: f8b69aa
Author: Edoardo Comar <ec...@uk.ibm.com>
Authored: Fri Sep 30 23:07:51 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Sep 30 23:07:51 2016 -0700
----------------------------------------------------------------------
.../consumer/internals/ConsumerCoordinator.java | 6 +
.../src/main/scala/kafka/admin/AdminUtils.scala | 4 +-
.../security/auth/SimpleAclAuthorizer.scala | 4 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 132 ++++++-----
.../main/scala/kafka/server/MetadataCache.scala | 6 -
.../kafka/api/AuthorizerIntegrationTest.scala | 233 ++++++++++++++++---
.../kafka/api/EndToEndAuthorizationTest.scala | 176 +++++++++++---
.../kafka/api/IntegrationTestHarness.scala | 48 +++-
.../unit/kafka/admin/DeleteTopicTest.scala | 6 +-
.../kafka/server/DeleteTopicsRequestTest.scala | 4 +-
docs/upgrade.html | 2 +
11 files changed, 474 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ff0d669..27d6a75 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -670,6 +670,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
resetGeneration();
future.raise(new CommitFailedException());
return;
+ } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+ log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
+ future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message()));
+ return;
} else {
log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
@@ -731,6 +735,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// re-discover the coordinator and retry
coordinatorDead();
future.raise(error);
+ } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+ future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message()));
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 0273bdb..7873028 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -26,7 +26,7 @@ import kafka.utils.ZkUtils._
import java.util.Random
import java.util.Properties
import org.apache.kafka.common.Node
-import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException}
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.MetadataResponse
@@ -325,7 +325,7 @@ object AdminUtils extends Logging with AdminUtilities {
case e2: Throwable => throw new AdminOperationException(e2)
}
} else {
- throw new InvalidTopicException("topic %s to delete does not exist".format(topic))
+ throw new UnknownTopicOrPartitionException("topic %s to delete does not exist".format(topic))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index a36a07d..42bfebf 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -127,9 +127,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
//check if there is any Deny acl match that would disallow this operation.
val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)
- //if principal is allowed to read or write we allow describe by default, the reverse does not apply to Deny.
+ //if principal is allowed to read, write or delete we allow describe by default, the reverse does not apply to Deny.
val ops = if (Describe == operation)
- Set[Operation](operation, Read, Write)
+ Set[Operation](operation, Read, Write, Delete)
else
Set[Operation](operation)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 85c47e6..d765c8a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -233,44 +233,48 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseBody = new OffsetCommitResponse(results.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
- // filter non-existent topics
- val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) =>
- !metadataCache.contains(topicPartition.topic)
+ val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
+ case (topicPartition, _) => {
+ val authorizedForDescribe = authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
+ val exists = metadataCache.contains(topicPartition.topic)
+ if (!authorizedForDescribe && exists)
+ debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
+ s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION")
+ authorizedForDescribe && exists
+ }
}
- val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys
- val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
- case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic))
+ val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition {
+ case (topicPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic))
}
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) {
- val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
-
- mergedCommitStatus.foreach { case (topicPartition, errorCode) =>
- if (errorCode != Errors.NONE.code) {
- debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
- s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
+ val combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++
+ unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++
+ nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
+
+ if (logger.isDebugEnabled()) //optimizing code as it's a loop
+ combinedCommitStatus.foreach { case (topicPartition, errorCode) =>
+ if (errorCode != Errors.NONE.code) {
+ debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
+ s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
+ }
}
- }
- val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
-
val responseHeader = new ResponseHeader(header.correlationId)
val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
- if (authorizedRequestInfo.isEmpty)
+ if (authorizedTopics.isEmpty)
sendResponseCallback(Map.empty)
else if (header.apiVersion == 0) {
// for version 0 always store offsets to ZK
- val responseInfo = authorizedRequestInfo.map {
+ val responseInfo = authorizedTopics.map {
case (topicPartition, partitionData) =>
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
try {
- if (!metadataCache.hasTopicMetadata(topicPartition.topic))
- (topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
+ if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
(topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
else {
zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString)
@@ -301,7 +305,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// - If v2 we use the default expiration timestamp
val currentTimestamp = SystemTime.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
- val partitionData = authorizedRequestInfo.mapValues { partitionData =>
+ val partitionData = authorizedTopics.mapValues { partitionData =>
val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
@@ -336,15 +340,22 @@ class KafkaApis(val requestChannel: RequestChannel,
val produceRequest = request.body.asInstanceOf[ProduceRequest]
val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
- val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.partitionRecords.asScala.partition {
+ val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition {
+ case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
+ }
+
+ val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))
}
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
- val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ =>
- new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp))
+ val mergedResponseStatus = responseStatus ++
+ unauthorizedForWriteRequestInfo.mapValues(_ =>
+ new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++
+ nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
+ new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Message.NoTimestamp))
var errorInResponse = false
@@ -432,11 +443,18 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
- val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
+ val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.requestInfo.partition {
+ case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicAndPartition.topic)) && metadataCache.contains(topicAndPartition.topic)
+ }
+
+ val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
case (topicAndPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicAndPartition.topic))
}
- val unauthorizedPartitionData = unauthorizedRequestInfo.map { case (tp, _) =>
+ val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map { case (tp, _) =>
+ (tp, FetchResponsePartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, MessageSet.Empty))
+ }
+ val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map { case (tp, _) =>
(tp, FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty))
}
@@ -466,7 +484,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} else responsePartitionData
- val mergedPartitionData = convertedPartitionData ++ unauthorizedPartitionData
+ val mergedPartitionData = convertedPartitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData
mergedPartitionData.foreach { case (topicAndPartition, data) =>
if (data.error != Errors.NONE.code)
@@ -554,7 +572,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
- new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava)
+ new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, List[JLong]().asJava)
)
val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) =>
@@ -605,7 +623,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
- new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code,
+ new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET)
})
@@ -775,7 +793,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} else if (config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
- new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic),
+ new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false,
java.util.Collections.emptyList())
}
}
@@ -804,25 +822,34 @@ class KafkaApis(val requestChannel: RequestChannel,
metadataRequest.topics.asScala.toSet
}
- var (authorizedTopics, unauthorizedTopics) =
+ var (authorizedTopics, unauthorizedForDescribeTopics) =
topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic)))
+ var unauthorizedForCreateTopics = Set[String]()
+
if (authorizedTopics.nonEmpty) {
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
- authorizer.foreach { az =>
- if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
- authorizedTopics --= nonExistingTopics
- unauthorizedTopics ++= nonExistingTopics
- }
+ if (!authorize(request.session, Create, Resource.ClusterResource)) {
+ authorizedTopics --= nonExistingTopics
+ unauthorizedForCreateTopics ++= nonExistingTopics
}
}
}
- val unauthorizedTopicMetadata = unauthorizedTopics.map(topic =>
- new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Topic.isInternal(topic),
+ val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic =>
+ new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic),
java.util.Collections.emptyList()))
+ // do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
+ val unauthorizedForDescribeTopicMetadata =
+ // In case of all topics, don't include topics unauthorized for Describe
+ if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
+ Set.empty[MetadataResponse.TopicMetadata]
+ else
+ unauthorizedForDescribeTopics.map(topic =>
+ new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()))
+
// In version 0, we returned an error when brokers with replicas were unavailable,
// while in higher versions we simply don't include the broker in the returned broker list
val errorUnavailableEndpoints = requestVersion == 0
@@ -832,7 +859,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else
getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints)
- val completeTopicMetadata = topicMetadata ++ unauthorizedTopicMetadata
+ val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
val brokers = metadataCache.getAliveBrokers
@@ -869,16 +896,15 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition =>
authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
}
- val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code)
- val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap
val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+ val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unknownTopicPartitionResponse)).toMap
if (header.apiVersion == 0) {
// version 0 reads offsets from ZK
val responseInfo = authorizedTopicPartitions.map { topicPartition =>
val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
try {
- if (!metadataCache.hasTopicMetadata(topicPartition.topic))
+ if (!metadataCache.contains(topicPartition.topic))
(topicPartition, unknownTopicPartitionResponse)
else {
val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
@@ -1169,21 +1195,17 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDeleteTopicsRequest(request: RequestChannel.Request) {
val deleteTopicRequest = request.body.asInstanceOf[DeleteTopicsRequest]
- val (authorizedTopics, unauthorizedTopics) = deleteTopicRequest.topics.asScala.partition( topic =>
- authorize(request.session, Delete, new Resource(auth.Topic, topic))
+ val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition( topic =>
+ authorize(request.session, Describe, new Resource(auth.Topic, topic)) && metadataCache.contains(topic)
)
- val unauthorizedResults = unauthorizedTopics.map ( topic =>
- // Avoid leaking that the topic exists if the user is not authorized to describe the topic
- if (authorize(request.session, Describe, new Resource(auth.Topic, topic))) {
- (topic, Errors.TOPIC_AUTHORIZATION_FAILED)
- } else {
- (topic, Errors.INVALID_TOPIC_EXCEPTION)
- }
- ).toMap
-
+ val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition( topic =>
+ authorize(request.session, Delete, new Resource(auth.Topic, topic))
+ )
+
def sendResponseCallback(results: Map[String, Errors]): Unit = {
- val completeResults = results ++ unauthorizedResults
+ val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map( topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
+ unauthorizedForDeleteTopics.map( topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
val respHeader = new ResponseHeader(request.header.correlationId)
val responseBody = new DeleteTopicsResponse(completeResults.asJava)
trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
@@ -1196,7 +1218,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}.toMap
sendResponseCallback(results)
} else {
- // If no authorized topics return immediatly
+ // If no authorized topics return immediately
if (authorizedTopics.isEmpty)
sendResponseCallback(Map())
else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index f493e7d..feef6ae 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -120,12 +120,6 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
}
}
- def hasTopicMetadata(topic: String): Boolean = {
- inReadLock(partitionMetadataLock) {
- cache.contains(topic)
- }
- }
-
def getAllTopics(): Set[String] = {
inReadLock(partitionMetadataLock) {
cache.keySet.toSet
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 6d3b098..be41581 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -15,6 +15,7 @@ package kafka.api
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.ExecutionException
+import java.util.regex.Pattern
import java.util.{ArrayList, Collections, Properties}
import kafka.common
@@ -22,7 +23,8 @@ import kafka.common.TopicAndPartition
import kafka.security.auth._
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
@@ -36,6 +38,14 @@ import org.junit.{After, Assert, Before, Test}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.Buffer
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+
+import org.apache.kafka.common.KafkaException
+import java.util.HashMap
+import kafka.admin.AdminUtils
class AuthorizerIntegrationTest extends BaseRequestTest {
@@ -43,12 +53,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val brokerId: Integer = 0
val topic = "topic"
+ val topicPattern = "topic.*"
val createTopic = "topic-new"
val deleteTopic = "topic-delete"
val part = 0
val correlationId = 0
val clientId = "client-Id"
val tp = new TopicPartition(topic, part)
+
val topicAndPartition = new TopicAndPartition(topic, part)
val group = "my-group"
val topicResource = new Resource(Topic, topic)
@@ -163,6 +175,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@After
override def tearDown() = {
producers.foreach(_.close())
+ consumers.foreach(_.wakeup())
consumers.foreach(_.close())
removeAllAcls
super.tearDown()
@@ -276,14 +289,39 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}
+ /*
+ * checking that whether the topic exists or not, when unauthorized, FETCH and PRODUCE do not leak the topic name
+ */
+ @Test
+ def testAuthorizationWithTopicNotExisting() {
+ AdminUtils.deleteTopic(zkUtils, topic)
+ TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
+ AdminUtils.deleteTopic(zkUtils, deleteTopic)
+ TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
+
+ val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
+ ApiKeys.PRODUCE -> createProduceRequest,
+ ApiKeys.FETCH -> createFetchRequest,
+ ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
+ )
+
+ for ((key, request) <- requestKeyToRequest) {
+ removeAllAcls
+ val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
+ sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, topicExists = false)
+ for ((resource, acls) <- RequestKeysToAcls(key))
+ addAndVerifyAcls(acls, resource)
+ sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, topicExists = false)
+ }
+ }
+
@Test
def testProduceWithNoTopicAccess() {
try {
sendRecords(numRecords, tp)
- fail("sendRecords should have thrown")
+ fail("should have thrown exception")
} catch {
- case e: TopicAuthorizationException =>
- assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
+ case e: TimeoutException => //expected
}
}
@@ -292,7 +330,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
try {
sendRecords(numRecords, tp)
- fail("sendRecords should have thrown")
+ fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
@@ -304,7 +342,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
try {
sendRecords(numRecords, tp)
- fail("sendRecords should have thrown")
+ fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
@@ -375,7 +413,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
@Test
- def testConsumeWithNoTopicAccess() {
+ def testConsumeWithoutTopicDescribeAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
@@ -386,7 +424,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
} catch {
- case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics());
+ case e: KafkaException => //expected
}
}
@@ -403,7 +441,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
} catch {
- case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics());
+ case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
}
}
@@ -421,7 +459,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
Assert.fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException =>
- assertEquals(Collections.singleton(topic), e.unauthorizedTopics());
+ assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
}
}
@@ -438,6 +476,125 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
@Test
+ def testPatternSubscriptionWithNoTopicAccess() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ sendRecords(1, tp)
+ removeAllAcls()
+
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+ this.consumers.head.poll(50)
+ assertTrue(this.consumers.head.subscription.isEmpty)
+ }
+
+ @Test
+ def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ sendRecords(1, tp)
+ removeAllAcls()
+
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ val consumer = consumers.head
+ consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+ try {
+ consumeRecords(consumer)
+ Assert.fail("Expected TopicAuthorizationException")
+ } catch {
+ case e: TopicAuthorizationException => //expected
+ }
+
+ }
+
+ @Test
+ def testPatternSubscriptionWithTopicAndGroupRead() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ sendRecords(1, tp)
+
+ //create a unmatched topic
+ val unmatchedTopic = "unmatched"
+ TestUtils.createTopic(zkUtils, unmatchedTopic, 1, 1, this.servers)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic))
+ sendRecords(1, new TopicPartition(unmatchedTopic, part))
+ removeAllAcls()
+
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ val consumer = consumers.head
+ consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+ consumeRecords(consumer)
+
+ // set the subscription pattern to an internal topic that the consumer has no read permission for, but since
+ // `exclude.internal.topics` is true by default, the subscription should be empty and no authorization exception
+ // should be thrown
+ consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener)
+ assertTrue(consumer.poll(50).isEmpty)
+ }
+
+ @Test
+ def testPatternSubscriptionMatchingInternalTopicWithNoPermission() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ sendRecords(1, tp)
+ removeAllAcls()
+
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+
+ val consumerConfig = new Properties
+ consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
+ val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
+ securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
+ try {
+ consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
+ consumeRecords(consumer)
+ assertEquals(Set[String](topic).asJava, consumer.subscription)
+ } finally consumer.close()
+ }
+
+ @Test
+ def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ sendRecords(1, tp)
+ removeAllAcls()
+
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ val internalTopicResource = new Resource(Topic, kafka.common.Topic.GroupMetadataTopicName)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
+
+ val consumerConfig = new Properties
+ consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
+ val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
+ securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
+ try {
+ consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
+ consumeRecords(consumer)
+ Assert.fail("Expected TopicAuthorizationException")
+ } catch {
+ case e: TopicAuthorizationException => //expected
+ } finally consumer.close()
+ }
+
+ @Test
+ def testPatternSubscriptionNotMatchingInternalTopic() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ sendRecords(1, tp)
+ removeAllAcls()
+
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+
+ val consumerConfig = new Properties
+ consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
+ val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
+ securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
+ try {
+ consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+ consumeRecords(consumer)
+ } finally consumer.close()
+}
+
+ @Test
def testCreatePermissionNeededToReadFromNonExistentTopic() {
val newTopic = "newTopic"
val topicPartition = new TopicPartition(newTopic, 0)
@@ -451,7 +608,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
Assert.fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException =>
- assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics());
+ assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
}
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource)
@@ -466,7 +623,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
- @Test(expected = classOf[TopicAuthorizationException])
+ @Test(expected = classOf[KafkaException])
def testCommitWithNoTopicAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
@@ -512,7 +669,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
this.consumers.head.position(tp)
}
- @Test(expected = classOf[TopicAuthorizationException])
+ @Test(expected = classOf[KafkaException])
def testOffsetFetchWithNoTopicAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.assign(List(tp).asJava)
@@ -537,14 +694,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testListOffsetsWithNoTopicAccess() {
- val e = intercept[TopicAuthorizationException] {
- this.consumers.head.partitionsFor(topic)
- }
- assertEquals(Set(topic), e.unauthorizedTopics().asScala)
+ val partitionInfos = this.consumers.head.partitionsFor(topic)
+ assertNull(partitionInfos)
}
@Test
- def testListOfsetsWithTopicDescribe() {
+ def testListOffsetsWithTopicDescribe() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
this.consumers.head.partitionsFor(topic)
}
@@ -554,7 +709,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val response = send(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
val deleteResponse = DeleteTopicsResponse.parse(response)
- assertEquals(Errors.INVALID_TOPIC_EXCEPTION, deleteResponse.errors.asScala.head._2)
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteResponse.errors.asScala.head._2)
}
@Test
@@ -585,24 +740,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
def sendRequestAndVerifyResponseErrorCode(apiKey: ApiKeys,
request: AbstractRequest,
resources: Set[ResourceType],
- isAuthorized: Boolean): AbstractRequestResponse = {
+ isAuthorized: Boolean,
+ topicExists: Boolean = true): AbstractRequestResponse = {
val resp = send(request, apiKey)
val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse]
val errorCode = RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractRequestResponse) => Short](response)
val possibleErrorCodes = resources.flatMap { resourceType =>
- if(resourceType == Topic)
- // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION to prevent leaking topic names
- Seq(resourceType.errorCode, Errors.INVALID_TOPIC_EXCEPTION.code())
+ if (resourceType == Topic)
+ // When completely unauthorized topic resources must return an UNKNOWN_TOPIC_OR_PARTITION to prevent leaking topic names
+ Seq(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
else
- Seq(resourceType.errorCode)
+ Seq(resourceType.errorCode)
}
- if (isAuthorized)
- assertFalse(s"${apiKey} should be allowed. Found error code $errorCode", possibleErrorCodes.contains(errorCode))
+ if (topicExists)
+ if (isAuthorized)
+ assertFalse(s"${apiKey} should be allowed. Found error code $errorCode", possibleErrorCodes.contains(errorCode))
+ else
+ assertTrue(s"${apiKey} should be forbidden. Found error code $errorCode but expected one of ${possibleErrorCodes.mkString(",")} ", possibleErrorCodes.contains(errorCode))
else
- assertTrue(s"${apiKey} should be forbidden. Found error code $errorCode but expected one of ${possibleErrorCodes.mkString(",")} ", possibleErrorCodes.contains(errorCode))
-
+ assertEquals(s"${apiKey} - Found error code $errorCode", Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), errorCode)
+
response
}
@@ -634,16 +793,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
topic: String = topic,
part: Int = part) {
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
- val maxIters = numRecords * 50
- var iters = 0
- while (records.size < numRecords) {
- for (record <- consumer.poll(50).asScala) {
- records.add(record)
- }
- if (iters > maxIters)
- throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
- iters += 1
+
+ val future = Future {
+ while (records.size < numRecords)
+ for (record <- consumer.poll(50).asScala)
+ records.add(record)
+ records
}
+ val result = Await.result(future, 10 seconds)
+
for (i <- 0 until numRecords) {
val record = records.get(i)
val offset = startingOffset + i
@@ -652,4 +810,5 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(offset.toLong, record.offset())
}
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 8edb6f8..2f5858c 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -19,7 +19,7 @@ package kafka.api
import java.io.File
import java.util.ArrayList
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{ExecutionException, TimeoutException => JTimeoutException}
import kafka.admin.AclCommand
import kafka.common.TopicAndPartition
@@ -28,16 +28,19 @@ import kafka.server._
import kafka.utils._
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerConfig}
-import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig}
+import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig, KafkaProducer}
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.{TopicPartition}
+import org.apache.kafka.common.{TopicPartition,KafkaException}
import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException}
+import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException,TimeoutException}
import org.junit.Assert._
import org.junit.{Test, After, Before}
import scala.collection.JavaConverters._
-
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
/**
* The test cases here verify that a producer authorized to publish to a topic
@@ -107,6 +110,26 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
s"--topic=$topic",
s"--producer",
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+ def describeAclArgs: Array[String] = Array("--authorizer-properties",
+ s"zookeeper.connect=$zkConnect",
+ s"--add",
+ s"--topic=$topic",
+ s"--operation=Describe",
+ s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+ def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties",
+ s"zookeeper.connect=$zkConnect",
+ s"--remove",
+ s"--force",
+ s"--topic=$topic",
+ s"--operation=Describe",
+ s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+ def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties",
+ s"zookeeper.connect=$zkConnect",
+ s"--remove",
+ s"--force",
+ s"--topic=$topic",
+ s"--operation=Write",
+ s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
def consumeAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--add",
@@ -149,18 +172,28 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
}
super.setUp
AclCommand.main(topicBrokerReadAclArgs)
- servers.foreach( s =>
+ servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*"))
- )
+ }
// create the test topic with all the brokers as replicas
TestUtils.createTopic(zkUtils, topic, 1, 3, this.servers)
}
+ override def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+ TestUtils.createNewProducer(brokerList,
+ maxBlockMs = 5000L,
+ securityProtocol = this.securityProtocol,
+ trustStoreFile = this.trustStoreFile,
+ saslProperties = this.saslProperties,
+ props = Some(producerConfig))
+ }
+
/**
* Closes MiniKDC last when tearing down.
*/
@After
override def tearDown {
+ consumers.foreach(_.wakeup())
super.tearDown
closeSasl()
}
@@ -187,10 +220,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
private def setAclsAndProduce() {
AclCommand.main(produceAclArgs)
AclCommand.main(consumeAclArgs)
- servers.foreach(s => {
+ servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
- })
+ }
//Produce records
debug("Starting to send records")
sendRecords(numRecords, tp)
@@ -203,35 +236,93 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
* isn't set.
*/
@Test
- def testNoProduceAcl {
+ def testNoProduceWithoutDescribeAcl {
//Produce records
debug("Starting to send records")
try{
sendRecords(numRecords, tp)
- fail("Topic authorization exception expected")
+ fail("exception expected")
} catch {
- case e: TopicAuthorizationException => //expected
+ case e: TimeoutException => //expected
}
}
- /**
+ @Test
+ def testNoProduceWithDescribeAcl {
+ AclCommand.main(describeAclArgs)
+ servers.foreach { s =>
+ TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+ }
+ //Produce records
+ debug("Starting to send records")
+ try{
+ sendRecords(numRecords, tp)
+ fail("exception expected")
+ } catch {
+ case e: TopicAuthorizationException => //expected
+ }
+ }
+
+ /**
* Tests that a consumer fails to consume messages without the appropriate
* ACL set.
*/
@Test
- def testNoConsumeAcl {
+ def testNoConsumeWithoutDescribeAclViaAssign {
+ noConsumeWithoutDescribeAclSetup
+ consumers.head.assign(List(tp).asJava)
+
+ try {
+ consumeRecords(this.consumers.head)
+ fail("exception expected")
+ } catch {
+ case e: KafkaException => //expected
+ }
+ }
+
+ @Test
+ def testNoConsumeWithoutDescribeAclViaSubscribe {
+ noConsumeWithoutDescribeAclSetup
+ consumers.head.subscribe(List(topic).asJava)
+
+ try {
+ consumeRecords(this.consumers.head)
+ fail("exception expected")
+ } catch {
+ case e: JTimeoutException => //expected
+ }
+ }
+
+ private def noConsumeWithoutDescribeAclSetup {
AclCommand.main(produceAclArgs)
AclCommand.main(groupAclArgs)
- servers.foreach(s => {
+ servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
- })
+ }
//Produce records
debug("Starting to send records")
sendRecords(numRecords, tp)
- //Consume records
+
+ //Deleting topic ACL
+ AclCommand.main(deleteDescribeAclArgs)
+ AclCommand.main(deleteWriteAclArgs)
+ servers.foreach { s =>
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
+ }
+
debug("Finished sending and starting to consume records")
+ }
+
+ /**
+ * Tests that a consumer fails to consume messages without the appropriate
+ * ACL set.
+ */
+ @Test
+ def testNoConsumeWithDescribeAclViaAssign {
+ noConsumeWithDescribeAclSetup
consumers.head.assign(List(tp).asJava)
+
try {
consumeRecords(this.consumers.head)
fail("Topic authorization exception expected")
@@ -239,6 +330,33 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
case e: TopicAuthorizationException => //expected
}
}
+
+ @Test
+ def testNoConsumeWithDescribeAclViaSubscribe {
+ noConsumeWithDescribeAclSetup
+ consumers.head.subscribe(List(topic).asJava)
+
+ try {
+ consumeRecords(this.consumers.head)
+ fail("Topic authorization exception expected")
+ } catch {
+ case e: TopicAuthorizationException => //expected
+ }
+ }
+
+ private def noConsumeWithDescribeAclSetup {
+ AclCommand.main(produceAclArgs)
+ AclCommand.main(groupAclArgs)
+ servers.foreach { s =>
+ TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
+ }
+ //Produce records
+ debug("Starting to send records")
+ sendRecords(numRecords, tp)
+ //Consume records
+ debug("Finished sending and starting to consume records")
+ }
/**
* Tests that a consumer fails to consume messages without the appropriate
@@ -247,9 +365,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
@Test
def testNoGroupAcl {
AclCommand.main(produceAclArgs)
- servers.foreach(s =>
+ servers.foreach { s =>
TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
- )
+ }
//Produce records
debug("Starting to send records")
sendRecords(numRecords, tp)
@@ -283,22 +401,22 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
topic: String = topic,
part: Int = part) {
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
- val maxIters = numRecords * 50
- var iters = 0
- while (records.size < numRecords) {
- for (record <- consumer.poll(50).asScala) {
- records.add(record)
- }
- if (iters > maxIters)
- throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
- iters += 1
+
+ val future = Future {
+ while (records.size < numRecords)
+ for (record <- consumer.poll(50).asScala)
+ records.add(record)
+ records
}
+ val result = Await.result(future, 10 seconds)
+
for (i <- 0 until numRecords) {
val record = records.get(i)
val offset = startingOffset + i
assertEquals(topic, record.topic())
assertEquals(part, record.partition())
assertEquals(offset.toLong, record.offset())
- }
+ }
}
}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 9595ad6..ffca431 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -29,6 +29,8 @@ import kafka.integration.KafkaServerTestHarness
import org.junit.{After, Before}
import scala.collection.mutable.Buffer
+import scala.util.control.Breaks.{breakable, break}
+import java.util.ConcurrentModificationException
/**
* A helper class for writing integration tests that involve producers, consumers, and servers
@@ -64,17 +66,9 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.putAll(consumerSecurityProps)
for (i <- 0 until producerCount)
- producers += TestUtils.createNewProducer(brokerList,
- securityProtocol = this.securityProtocol,
- trustStoreFile = this.trustStoreFile,
- saslProperties = this.saslProperties,
- props = Some(producerConfig))
+ producers += createNewProducer
for (i <- 0 until consumerCount) {
- consumers += TestUtils.createNewConsumer(brokerList,
- securityProtocol = this.securityProtocol,
- trustStoreFile = this.trustStoreFile,
- saslProperties = this.saslProperties,
- props = Some(consumerConfig))
+ consumers += createNewConsumer
}
// create the consumer offset topic
@@ -85,10 +79,42 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
servers.head.groupCoordinator.offsetsTopicConfigs)
}
+ //extracted method to allow for different params in some specific tests
+ def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+ TestUtils.createNewProducer(brokerList,
+ securityProtocol = this.securityProtocol,
+ trustStoreFile = this.trustStoreFile,
+ saslProperties = this.saslProperties,
+ props = Some(producerConfig))
+ }
+
+ //extracted method to allow for different params in some specific tests
+ def createNewConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
+ TestUtils.createNewConsumer(brokerList,
+ securityProtocol = this.securityProtocol,
+ trustStoreFile = this.trustStoreFile,
+ saslProperties = this.saslProperties,
+ props = Some(consumerConfig))
+ }
+
@After
override def tearDown() {
producers.foreach(_.close())
- consumers.foreach(_.close())
+
+ consumers.foreach { consumer =>
+ breakable {
+ while(true) {
+ try {
+ consumer.close
+ break
+ } catch {
+ //short wait to make sure that woken up consumer can be closed without spurious ConcurrentModificationException
+ case e: ConcurrentModificationException => Thread.sleep(100L)
+ }
+ }
+ }
+ }
+
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ea5a213..d39de75 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -26,7 +26,7 @@ import org.junit.Test
import java.util.Properties
import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
class DeleteTopicTest extends ZooKeeperTestHarness {
@@ -206,9 +206,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// start topic deletion
try {
AdminUtils.deleteTopic(zkUtils, "test2")
- fail("Expected InvalidTopicException")
+ fail("Expected UnknownTopicOrPartitionException")
} catch {
- case e: InvalidTopicException => // expected exception
+ case e: UnknownTopicOrPartitionException => // expected exception
}
// verify delete topic path for test2 is removed from zookeeper
TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index a59316b..e04e1b7 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -58,7 +58,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
// Basic
validateErrorDeleteTopicRequests(new DeleteTopicsRequest(Set("invalid-topic").asJava, timeout),
- Map("invalid-topic" -> Errors.INVALID_TOPIC_EXCEPTION))
+ Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION))
// Partial
TestUtils.createTopic(zkUtils, "partial-topic-1", 1, 1, servers)
@@ -67,7 +67,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
"partial-invalid-topic").asJava, timeout),
Map(
"partial-topic-1" -> Errors.NONE,
- "partial-invalid-topic" -> Errors.INVALID_TOPIC_EXCEPTION
+ "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION
)
)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 7b16ab0..1b1c593 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -64,6 +64,8 @@ Note: Because new protocols are introduced, it is important to upgrade your Kafk
<li> Kafka clusters can now be uniquely identified by a cluster id. It will be automatically generated when a broker is upgraded to 0.10.1.0. The cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId metric and it is part of the Metadata response. Serializers, client interceptors and metric reporters can receive the cluster id by implementing the ClusterResourceListener interface. </li>
<li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li>
<li> The new Java Consumer now allows users to search offsets by timestamp on partitions. </li>
+ <li> When using an Authorizer and a user hasn't got <b>Describe</b> authorization on a topic, the broker will no longer return TOPIC_AUTHORIZATION_FAILED errors
+ but just UNKNOWN_TOPIC_OR_PARTITION errors, to avoid leaking topic names.</li>
</ul>
<h5><a id="upgrade_1010_new_protocols" href="#upgrade_1010_new_protocols">New Protocol Versions</a></h5>