You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/10/01 06:08:44 UTC

kafka git commit: KAFKA-3396; Ensure Describe access is required to detect topic existence

Repository: kafka
Updated Branches:
  refs/heads/trunk f8b69aacd -> 8124f6e09


KAFKA-3396; Ensure Describe access is required to detect topic existence

Reopening of https://github.com/apache/kafka/pull/1428

Author: Edoardo Comar <ec...@uk.ibm.com>
Author: Mickael Maison <mi...@gmail.com>

Reviewers: Grant Henke <gr...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #1908 from edoardocomar/KAFKA-3396


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8124f6e0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8124f6e0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8124f6e0

Branch: refs/heads/trunk
Commit: 8124f6e0996cb673760750b3aba004ae11e34c6a
Parents: f8b69aa
Author: Edoardo Comar <ec...@uk.ibm.com>
Authored: Fri Sep 30 23:07:51 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Sep 30 23:07:51 2016 -0700

----------------------------------------------------------------------
 .../consumer/internals/ConsumerCoordinator.java |   6 +
 .../src/main/scala/kafka/admin/AdminUtils.scala |   4 +-
 .../security/auth/SimpleAclAuthorizer.scala     |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 132 ++++++-----
 .../main/scala/kafka/server/MetadataCache.scala |   6 -
 .../kafka/api/AuthorizerIntegrationTest.scala   | 233 ++++++++++++++++---
 .../kafka/api/EndToEndAuthorizationTest.scala   | 176 +++++++++++---
 .../kafka/api/IntegrationTestHarness.scala      |  48 +++-
 .../unit/kafka/admin/DeleteTopicTest.scala      |   6 +-
 .../kafka/server/DeleteTopicsRequestTest.scala  |   4 +-
 docs/upgrade.html                               |   2 +
 11 files changed, 474 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ff0d669..27d6a75 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -670,6 +670,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     resetGeneration();
                     future.raise(new CommitFailedException());
                     return;
+                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                    log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
+                    future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message()));
+                    return;
                 } else {
                     log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
                     future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
@@ -731,6 +735,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         // re-discover the coordinator and retry
                         coordinatorDead();
                         future.raise(error);
+                    } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                        future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message()));
                     } else {
                         future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 0273bdb..7873028 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -26,7 +26,7 @@ import kafka.utils.ZkUtils._
 import java.util.Random
 import java.util.Properties
 import org.apache.kafka.common.Node
-import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException}
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.MetadataResponse
 
@@ -325,7 +325,7 @@ object AdminUtils extends Logging with AdminUtilities {
           case e2: Throwable => throw new AdminOperationException(e2)
         }
       } else {
-        throw new InvalidTopicException("topic %s to delete does not exist".format(topic))
+        throw new UnknownTopicOrPartitionException("topic %s to delete does not exist".format(topic))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index a36a07d..42bfebf 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -127,9 +127,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
     //check if there is any Deny acl match that would disallow this operation.
     val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)
 
-    //if principal is allowed to read or write we allow describe by default, the reverse does not apply to Deny.
+    //if principal is allowed to read, write or delete we allow describe by default, the reverse does not apply to Deny.
     val ops = if (Describe == operation)
-      Set[Operation](operation, Read, Write)
+      Set[Operation](operation, Read, Write, Delete)
     else
       Set[Operation](operation)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 85c47e6..d765c8a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -233,44 +233,48 @@ class KafkaApis(val requestChannel: RequestChannel,
       val responseBody = new OffsetCommitResponse(results.asJava)
       requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
     } else {
-      // filter non-existent topics
-      val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) =>
-        !metadataCache.contains(topicPartition.topic)
+      val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
+        case (topicPartition, _) => {
+          val authorizedForDescribe = authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
+          val exists = metadataCache.contains(topicPartition.topic)
+          if (!authorizedForDescribe && exists) 
+              debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
+                s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION")
+          authorizedForDescribe && exists
+        }
       }
-      val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys
 
-      val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
-        case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic))
+      val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition {
+        case (topicPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic))
       }
 
       // the callback for sending an offset commit response
       def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) {
-        val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
-
-        mergedCommitStatus.foreach { case (topicPartition, errorCode) =>
-          if (errorCode != Errors.NONE.code) {
-            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-              s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
+        val combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++
+          unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++
+          nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
+
+        if (logger.isDebugEnabled()) //optimizing code as it's a loop
+          combinedCommitStatus.foreach { case (topicPartition, errorCode) =>
+            if (errorCode != Errors.NONE.code) {
+              debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
+                s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
+            }
           }
-        }
-        val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
-
         val responseHeader = new ResponseHeader(header.correlationId)
         val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava)
         requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
       }
 
-      if (authorizedRequestInfo.isEmpty)
+      if (authorizedTopics.isEmpty)
         sendResponseCallback(Map.empty)
       else if (header.apiVersion == 0) {
         // for version 0 always store offsets to ZK
-        val responseInfo = authorizedRequestInfo.map {
+        val responseInfo = authorizedTopics.map {
           case (topicPartition, partitionData) =>
             val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
             try {
-              if (!metadataCache.hasTopicMetadata(topicPartition.topic))
-                (topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-              else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
+              if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
                 (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
               else {
                 zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString)
@@ -301,7 +305,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         //   - If v2 we use the default expiration timestamp
         val currentTimestamp = SystemTime.milliseconds
         val defaultExpireTimestamp = offsetRetention + currentTimestamp
-        val partitionData = authorizedRequestInfo.mapValues { partitionData =>
+        val partitionData = authorizedTopics.mapValues { partitionData =>
           val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
           new OffsetAndMetadata(
             offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
@@ -336,15 +340,22 @@ class KafkaApis(val requestChannel: RequestChannel,
     val produceRequest = request.body.asInstanceOf[ProduceRequest]
     val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
 
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.partitionRecords.asScala.partition {
+    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition {
+      case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
+    }
+
+    val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
       case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))
     }
 
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
 
-      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ =>
-        new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp))
+      val mergedResponseStatus = responseStatus ++ 
+        unauthorizedForWriteRequestInfo.mapValues(_ =>
+           new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++ 
+        nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => 
+           new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Message.NoTimestamp))
 
       var errorInResponse = false
 
@@ -432,11 +443,18 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleFetchRequest(request: RequestChannel.Request) {
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
 
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition {
+    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.requestInfo.partition {
+      case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicAndPartition.topic)) && metadataCache.contains(topicAndPartition.topic)
+    }
+
+    val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
       case (topicAndPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicAndPartition.topic))
     }
 
-    val unauthorizedPartitionData = unauthorizedRequestInfo.map { case (tp, _) =>
+    val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map { case (tp, _) =>
+      (tp, FetchResponsePartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, MessageSet.Empty))
+    }
+    val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map { case (tp, _) =>
       (tp, FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty))
     }
 
@@ -466,7 +484,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           }
         } else responsePartitionData
 
-      val mergedPartitionData = convertedPartitionData ++ unauthorizedPartitionData
+      val mergedPartitionData = convertedPartitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData
 
       mergedPartitionData.foreach { case (topicAndPartition, data) =>
         if (data.error != Errors.NONE.code)
@@ -554,7 +572,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
-      new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava)
+      new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, List[JLong]().asJava)
     )
 
     val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) =>
@@ -605,7 +623,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
-      new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code,
+      new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
                                            ListOffsetResponse.UNKNOWN_TIMESTAMP,
                                            ListOffsetResponse.UNKNOWN_OFFSET)
     })
@@ -775,7 +793,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         } else if (config.autoCreateTopicsEnable) {
           createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
         } else {
-          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic),
+          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false,
             java.util.Collections.emptyList())
         }
       }
@@ -804,25 +822,34 @@ class KafkaApis(val requestChannel: RequestChannel,
           metadataRequest.topics.asScala.toSet
       }
 
-    var (authorizedTopics, unauthorizedTopics) =
+    var (authorizedTopics, unauthorizedForDescribeTopics) =
       topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic)))
 
+    var unauthorizedForCreateTopics = Set[String]()
+
     if (authorizedTopics.nonEmpty) {
       val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
       if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
-        authorizer.foreach { az =>
-          if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
-            authorizedTopics --= nonExistingTopics
-            unauthorizedTopics ++= nonExistingTopics
-          }
+        if (!authorize(request.session, Create, Resource.ClusterResource)) {
+          authorizedTopics --= nonExistingTopics
+          unauthorizedForCreateTopics ++= nonExistingTopics
         }
       }
     }
 
-    val unauthorizedTopicMetadata = unauthorizedTopics.map(topic =>
-      new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Topic.isInternal(topic),
+    val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic =>
+      new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic),
         java.util.Collections.emptyList()))
 
+    // do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
+    val unauthorizedForDescribeTopicMetadata =
+      // In case of all topics, don't include topics unauthorized for Describe
+      if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
+        Set.empty[MetadataResponse.TopicMetadata]
+      else
+        unauthorizedForDescribeTopics.map(topic =>
+          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()))
+
     // In version 0, we returned an error when brokers with replicas were unavailable,
     // while in higher versions we simply don't include the broker in the returned broker list
     val errorUnavailableEndpoints = requestVersion == 0
@@ -832,7 +859,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       else
         getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints)
 
-    val completeTopicMetadata = topicMetadata ++ unauthorizedTopicMetadata
+    val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
 
     val brokers = metadataCache.getAliveBrokers
 
@@ -869,16 +896,15 @@ class KafkaApis(val requestChannel: RequestChannel,
       val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition =>
         authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
       }
-      val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code)
-      val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap
       val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+      val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unknownTopicPartitionResponse)).toMap
 
       if (header.apiVersion == 0) {
         // version 0 reads offsets from ZK
         val responseInfo = authorizedTopicPartitions.map { topicPartition =>
           val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
           try {
-            if (!metadataCache.hasTopicMetadata(topicPartition.topic))
+            if (!metadataCache.contains(topicPartition.topic))
               (topicPartition, unknownTopicPartitionResponse)
             else {
               val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
@@ -1169,21 +1195,17 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {
     val deleteTopicRequest = request.body.asInstanceOf[DeleteTopicsRequest]
 
-    val (authorizedTopics, unauthorizedTopics) = deleteTopicRequest.topics.asScala.partition( topic =>
-      authorize(request.session, Delete, new Resource(auth.Topic, topic))
+    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition( topic =>
+      authorize(request.session, Describe, new Resource(auth.Topic, topic)) && metadataCache.contains(topic)
     )
 
-    val unauthorizedResults = unauthorizedTopics.map ( topic =>
-      // Avoid leaking that the topic exists if the user is not authorized to describe the topic
-      if (authorize(request.session, Describe, new Resource(auth.Topic, topic))) {
-        (topic, Errors.TOPIC_AUTHORIZATION_FAILED)
-      } else {
-        (topic, Errors.INVALID_TOPIC_EXCEPTION)
-      }
-    ).toMap
-
+    val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition( topic =>
+      authorize(request.session, Delete, new Resource(auth.Topic, topic))
+    )
+    
     def sendResponseCallback(results: Map[String, Errors]): Unit = {
-      val completeResults = results ++ unauthorizedResults
+      val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map( topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
+          unauthorizedForDeleteTopics.map( topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
       val respHeader = new ResponseHeader(request.header.correlationId)
       val responseBody = new DeleteTopicsResponse(completeResults.asJava)
       trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
@@ -1196,7 +1218,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }.toMap
       sendResponseCallback(results)
     } else {
-      // If no authorized topics return immediatly
+      // If no authorized topics return immediately
       if (authorizedTopics.isEmpty)
         sendResponseCallback(Map())
       else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index f493e7d..feef6ae 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -120,12 +120,6 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
-  def hasTopicMetadata(topic: String): Boolean = {
-    inReadLock(partitionMetadataLock) {
-      cache.contains(topic)
-    }
-  }
-
   def getAllTopics(): Set[String] = {
     inReadLock(partitionMetadataLock) {
       cache.keySet.toSet

http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 6d3b098..be41581 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -15,6 +15,7 @@ package kafka.api
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.ExecutionException
+import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 
 import kafka.common
@@ -22,7 +23,8 @@ import kafka.common.TopicAndPartition
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
@@ -36,6 +38,14 @@ import org.junit.{After, Assert, Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+
+import org.apache.kafka.common.KafkaException
+import java.util.HashMap
+import kafka.admin.AdminUtils
 
 class AuthorizerIntegrationTest extends BaseRequestTest {
 
@@ -43,12 +53,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val brokerId: Integer = 0
 
   val topic = "topic"
+  val topicPattern = "topic.*"
   val createTopic = "topic-new"
   val deleteTopic = "topic-delete"
   val part = 0
   val correlationId = 0
   val clientId = "client-Id"
   val tp = new TopicPartition(topic, part)
+
   val topicAndPartition = new TopicAndPartition(topic, part)
   val group = "my-group"
   val topicResource = new Resource(Topic, topic)
@@ -163,6 +175,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @After
   override def tearDown() = {
     producers.foreach(_.close())
+    consumers.foreach(_.wakeup())
     consumers.foreach(_.close())
     removeAllAcls
     super.tearDown()
@@ -276,14 +289,39 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     }
   }
 
+  /*
+   * checking that whether the topic exists or not, when unauthorized, FETCH and PRODUCE do not leak the topic name
+   */
+  @Test
+  def testAuthorizationWithTopicNotExisting() {
+    AdminUtils.deleteTopic(zkUtils, topic)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
+    AdminUtils.deleteTopic(zkUtils, deleteTopic)
+    TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
+    
+    val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
+      ApiKeys.PRODUCE -> createProduceRequest,
+      ApiKeys.FETCH -> createFetchRequest,
+      ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
+    )
+
+    for ((key, request) <- requestKeyToRequest) {
+      removeAllAcls
+      val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
+      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, topicExists = false)
+      for ((resource, acls) <- RequestKeysToAcls(key))
+        addAndVerifyAcls(acls, resource)
+      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, topicExists = false)
+    }
+  }
+
   @Test
   def testProduceWithNoTopicAccess() {
     try {
       sendRecords(numRecords, tp)
-      fail("sendRecords should have thrown")
+      fail("should have thrown exception")
     } catch {
-      case e: TopicAuthorizationException =>
-        assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
+      case e: TimeoutException => //expected
     }
   }
 
@@ -292,7 +330,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
     try {
       sendRecords(numRecords, tp)
-      fail("sendRecords should have thrown")
+      fail("should have thrown exception")
     } catch {
       case e: TopicAuthorizationException =>
         assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
@@ -304,7 +342,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
     try {
       sendRecords(numRecords, tp)
-      fail("sendRecords should have thrown")
+      fail("should have thrown exception")
     } catch {
       case e: TopicAuthorizationException =>
         assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
@@ -375,7 +413,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
-  def testConsumeWithNoTopicAccess() {
+  def testConsumeWithoutTopicDescribeAccess() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
@@ -386,7 +424,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       consumeRecords(this.consumers.head)
       Assert.fail("should have thrown exception")
     } catch {
-      case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics());
+      case e: KafkaException => //expected
     }
   }
 
@@ -403,7 +441,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       consumeRecords(this.consumers.head)
       Assert.fail("should have thrown exception")
     } catch {
-      case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics());
+      case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
     }
   }
 
@@ -421,7 +459,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       Assert.fail("should have thrown exception")
     } catch {
       case e: TopicAuthorizationException =>
-        assertEquals(Collections.singleton(topic), e.unauthorizedTopics());
+        assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
     }
   }
 
@@ -438,6 +476,125 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
+  def testPatternSubscriptionWithNoTopicAccess() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+    this.consumers.head.poll(50)
+    assertTrue(this.consumers.head.subscription.isEmpty)
+  }
+
+  @Test
+  def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    val consumer = consumers.head
+    consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+    try {
+      consumeRecords(consumer)
+      Assert.fail("Expected TopicAuthorizationException")
+    } catch {
+      case e: TopicAuthorizationException => //expected
+    } 
+
+  }
+
+  @Test
+  def testPatternSubscriptionWithTopicAndGroupRead() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    sendRecords(1, tp)
+
+    //create a unmatched topic
+    val unmatchedTopic = "unmatched"
+    TestUtils.createTopic(zkUtils, unmatchedTopic, 1, 1, this.servers)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),  new Resource(Topic, unmatchedTopic))
+    sendRecords(1, new TopicPartition(unmatchedTopic, part))
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    val consumer = consumers.head
+    consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+    consumeRecords(consumer)
+
+    // set the subscription pattern to an internal topic that the consumer has no read permission for, but since
+    // `exclude.internal.topics` is true by default, the subscription should be empty and no authorization exception
+    // should be thrown
+    consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener)
+    assertTrue(consumer.poll(50).isEmpty)
+  }
+
+  @Test
+  def testPatternSubscriptionMatchingInternalTopicWithNoPermission() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+
+    val consumerConfig = new Properties
+    consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
+    val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
+      securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
+    try {
+      consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
+      consumeRecords(consumer)
+      assertEquals(Set[String](topic).asJava, consumer.subscription)
+    } finally consumer.close()
+  }
+
+  @Test
+  def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+    val internalTopicResource = new Resource(Topic, kafka.common.Topic.GroupMetadataTopicName)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
+
+    val consumerConfig = new Properties
+    consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
+    val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
+      securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
+    try {
+      consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
+      consumeRecords(consumer)
+      Assert.fail("Expected TopicAuthorizationException")
+    } catch {
+      case e: TopicAuthorizationException => //expected
+    } finally consumer.close()
+  }
+
+  @Test
+  def testPatternSubscriptionNotMatchingInternalTopic() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+
+    val consumerConfig = new Properties
+    consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false")
+    val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
+      securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
+    try {
+      consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+      consumeRecords(consumer)
+    } finally consumer.close()
+}
+
+  @Test
   def testCreatePermissionNeededToReadFromNonExistentTopic() {
     val newTopic = "newTopic"
     val topicPartition = new TopicPartition(newTopic, 0)
@@ -451,7 +608,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       Assert.fail("should have thrown exception")
     } catch {
       case e: TopicAuthorizationException =>
-        assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics());
+        assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
     }
 
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource)
@@ -466,7 +623,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
   }
 
-  @Test(expected = classOf[TopicAuthorizationException])
+  @Test(expected = classOf[KafkaException])
   def testCommitWithNoTopicAccess() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
     this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
@@ -512,7 +669,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     this.consumers.head.position(tp)
   }
 
-  @Test(expected = classOf[TopicAuthorizationException])
+  @Test(expected = classOf[KafkaException])
   def testOffsetFetchWithNoTopicAccess() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
@@ -537,14 +694,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   @Test
   def testListOffsetsWithNoTopicAccess() {
-    val e = intercept[TopicAuthorizationException] {
-      this.consumers.head.partitionsFor(topic)
-    }
-    assertEquals(Set(topic), e.unauthorizedTopics().asScala)
+    val partitionInfos = this.consumers.head.partitionsFor(topic)
+    assertNull(partitionInfos)
   }
 
   @Test
-  def testListOfsetsWithTopicDescribe() {
+  def testListOffsetsWithTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
     this.consumers.head.partitionsFor(topic)
   }
@@ -554,7 +709,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val response = send(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
     val deleteResponse = DeleteTopicsResponse.parse(response)
 
-    assertEquals(Errors.INVALID_TOPIC_EXCEPTION, deleteResponse.errors.asScala.head._2)
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteResponse.errors.asScala.head._2)
   }
 
   @Test
@@ -585,24 +740,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def sendRequestAndVerifyResponseErrorCode(apiKey: ApiKeys,
                                             request: AbstractRequest,
                                             resources: Set[ResourceType],
-                                            isAuthorized: Boolean): AbstractRequestResponse = {
+                                            isAuthorized: Boolean,
+                                            topicExists: Boolean = true): AbstractRequestResponse = {
     val resp = send(request, apiKey)
     val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse]
     val errorCode = RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractRequestResponse) => Short](response)
 
     val possibleErrorCodes = resources.flatMap { resourceType =>
-      if(resourceType == Topic)
-        // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION to prevent leaking topic names
-        Seq(resourceType.errorCode, Errors.INVALID_TOPIC_EXCEPTION.code())
+      if (resourceType == Topic)
+          // When completely unauthorized topic resources must return an UNKNOWN_TOPIC_OR_PARTITION to prevent leaking topic names
+          Seq(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
       else
-        Seq(resourceType.errorCode)
+          Seq(resourceType.errorCode)
     }
 
-    if (isAuthorized)
-      assertFalse(s"${apiKey} should be allowed. Found error code $errorCode", possibleErrorCodes.contains(errorCode))
+    if (topicExists)
+      if (isAuthorized)
+        assertFalse(s"${apiKey} should be allowed. Found error code $errorCode", possibleErrorCodes.contains(errorCode))
+      else
+        assertTrue(s"${apiKey} should be forbidden. Found error code $errorCode but expected one of ${possibleErrorCodes.mkString(",")} ", possibleErrorCodes.contains(errorCode))
     else
-      assertTrue(s"${apiKey} should be forbidden. Found error code $errorCode but expected one of ${possibleErrorCodes.mkString(",")} ", possibleErrorCodes.contains(errorCode))
-
+      assertEquals(s"${apiKey} - Found error code $errorCode", Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), errorCode) 
+      
     response
   }
 
@@ -634,16 +793,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
                              topic: String = topic,
                              part: Int = part) {
     val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
-    val maxIters = numRecords * 50
-    var iters = 0
-    while (records.size < numRecords) {
-      for (record <- consumer.poll(50).asScala) {
-        records.add(record)
-      }
-      if (iters > maxIters)
-        throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
-      iters += 1
+
+    val future = Future {
+      while (records.size < numRecords) 
+        for (record <- consumer.poll(50).asScala)
+          records.add(record)
+      records
     }
+    val result = Await.result(future, 10 seconds)
+
     for (i <- 0 until numRecords) {
       val record = records.get(i)
       val offset = startingOffset + i
@@ -652,4 +810,5 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       assertEquals(offset.toLong, record.offset())
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 8edb6f8..2f5858c 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -19,7 +19,7 @@ package kafka.api
 
 import java.io.File
 import java.util.ArrayList
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{ExecutionException, TimeoutException => JTimeoutException}
 
 import kafka.admin.AclCommand
 import kafka.common.TopicAndPartition
@@ -28,16 +28,19 @@ import kafka.server._
 import kafka.utils._
 
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerConfig}
-import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig}
+import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig, KafkaProducer}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.{TopicPartition}
+import org.apache.kafka.common.{TopicPartition,KafkaException}
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException}
+import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException,TimeoutException}
 import org.junit.Assert._
 import org.junit.{Test, After, Before}
 
 import scala.collection.JavaConverters._
-
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
 
 /**
   * The test cases here verify that a producer authorized to publish to a topic
@@ -107,6 +110,26 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
                                           s"--topic=$topic",
                                           s"--producer",
                                           s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+  def describeAclArgs: Array[String] = Array("--authorizer-properties",
+                                          s"zookeeper.connect=$zkConnect",
+                                          s"--add",
+                                          s"--topic=$topic",
+                                          s"--operation=Describe",
+                                          s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+  def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties",
+                                          s"zookeeper.connect=$zkConnect",
+                                          s"--remove",
+                                          s"--force",
+                                          s"--topic=$topic",
+                                          s"--operation=Describe",
+                                          s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+  def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties",
+                                          s"zookeeper.connect=$zkConnect",
+                                          s"--remove",
+                                          s"--force",
+                                          s"--topic=$topic",
+                                          s"--operation=Write",
+                                          s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
   def consumeAclArgs: Array[String] = Array("--authorizer-properties",
                                                s"zookeeper.connect=$zkConnect",
                                                s"--add",
@@ -149,18 +172,28 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
     }
     super.setUp
     AclCommand.main(topicBrokerReadAclArgs)
-    servers.foreach( s =>
+    servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*"))
-    )
+    }
     // create the test topic with all the brokers as replicas
     TestUtils.createTopic(zkUtils, topic, 1, 3, this.servers)
   }
 
+  override def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+    TestUtils.createNewProducer(brokerList,
+                                  maxBlockMs = 5000L,
+                                  securityProtocol = this.securityProtocol,
+                                  trustStoreFile = this.trustStoreFile,
+                                  saslProperties = this.saslProperties,
+                                  props = Some(producerConfig))
+  }
+  
   /**
     * Closes MiniKDC last when tearing down.
     */
   @After
   override def tearDown {
+    consumers.foreach(_.wakeup())
     super.tearDown
     closeSasl()
   }
@@ -187,10 +220,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
   private def setAclsAndProduce() {
     AclCommand.main(produceAclArgs)
     AclCommand.main(consumeAclArgs)
-    servers.foreach(s => {
+    servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
-    })
+    }
     //Produce records
     debug("Starting to send records")
     sendRecords(numRecords, tp)
@@ -203,35 +236,93 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
     * isn't set.
     */
   @Test
-  def testNoProduceAcl {
+  def testNoProduceWithoutDescribeAcl {
     //Produce records
     debug("Starting to send records")
     try{
       sendRecords(numRecords, tp)
-      fail("Topic authorization exception expected")
+      fail("exception expected")
     } catch {
-      case e: TopicAuthorizationException => //expected
+      case e: TimeoutException => //expected
     }
   }
 
-  /**
+  @Test
+  def testNoProduceWithDescribeAcl {
+    AclCommand.main(describeAclArgs)
+    servers.foreach { s =>
+      TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+    }
+    //Produce records
+    debug("Starting to send records")
+    try{
+      sendRecords(numRecords, tp)
+      fail("exception expected")
+    } catch {
+      case e: TopicAuthorizationException => //expected
+    }
+  }
+  
+   /**
     * Tests that a consumer fails to consume messages without the appropriate
     * ACL set.
     */
   @Test
-  def testNoConsumeAcl {
+  def testNoConsumeWithoutDescribeAclViaAssign {
+    noConsumeWithoutDescribeAclSetup
+    consumers.head.assign(List(tp).asJava)
+
+    try {
+      consumeRecords(this.consumers.head)
+      fail("exception expected")
+    } catch {
+       case e: KafkaException => //expected
+    }
+  }
+  
+  @Test
+  def testNoConsumeWithoutDescribeAclViaSubscribe {
+    noConsumeWithoutDescribeAclSetup
+    consumers.head.subscribe(List(topic).asJava)
+
+    try {
+      consumeRecords(this.consumers.head)
+      fail("exception expected")
+    } catch {
+      case e: JTimeoutException => //expected
+    }
+  } 
+  
+  private def noConsumeWithoutDescribeAclSetup {
     AclCommand.main(produceAclArgs)
     AclCommand.main(groupAclArgs)
-    servers.foreach(s => {
+    servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
-    })
+    }
     //Produce records
     debug("Starting to send records")
     sendRecords(numRecords, tp)
-    //Consume records
+
+    //Deleting topic ACL
+    AclCommand.main(deleteDescribeAclArgs)
+    AclCommand.main(deleteWriteAclArgs)
+    servers.foreach { s =>
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
+    }
+    
     debug("Finished sending and starting to consume records")
+  }
+ 
+  /**
+    * Tests that a consumer fails to consume messages without the appropriate
+    * ACL set.
+    */
+  @Test
+  def testNoConsumeWithDescribeAclViaAssign {
+    noConsumeWithDescribeAclSetup
     consumers.head.assign(List(tp).asJava)
+
     try {
       consumeRecords(this.consumers.head)
       fail("Topic authorization exception expected")
@@ -239,6 +330,33 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
       case e: TopicAuthorizationException => //expected
     }
   }
+  
+  @Test
+  def testNoConsumeWithDescribeAclViaSubscribe {
+    noConsumeWithDescribeAclSetup
+    consumers.head.subscribe(List(topic).asJava)
+
+    try {
+      consumeRecords(this.consumers.head)
+      fail("Topic authorization exception expected")
+    } catch {
+      case e: TopicAuthorizationException => //expected
+    }
+  }
+  
+  private def noConsumeWithDescribeAclSetup {
+    AclCommand.main(produceAclArgs) 
+    AclCommand.main(groupAclArgs)
+    servers.foreach { s =>
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
+    }
+    //Produce records
+    debug("Starting to send records")
+    sendRecords(numRecords, tp)
+    //Consume records
+    debug("Finished sending and starting to consume records")
+  }
 
   /**
     * Tests that a consumer fails to consume messages without the appropriate
@@ -247,9 +365,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
   @Test
   def testNoGroupAcl {
     AclCommand.main(produceAclArgs)
-    servers.foreach(s =>
+    servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
-    )
+    }
     //Produce records
     debug("Starting to send records")
     sendRecords(numRecords, tp)
@@ -283,22 +401,22 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
                              topic: String = topic,
                              part: Int = part) {
     val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
-    val maxIters = numRecords * 50
-    var iters = 0
-    while (records.size < numRecords) {
-      for (record <- consumer.poll(50).asScala) {
-        records.add(record)
-      }
-      if (iters > maxIters)
-        throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
-      iters += 1
+
+    val future = Future {
+      while (records.size < numRecords) 
+        for (record <- consumer.poll(50).asScala)
+          records.add(record)
+      records
     }
+    val result = Await.result(future, 10 seconds)
+
     for (i <- 0 until numRecords) {
       val record = records.get(i)
       val offset = startingOffset + i
       assertEquals(topic, record.topic())
       assertEquals(part, record.partition())
       assertEquals(offset.toLong, record.offset())
-    } 
+    }
   }
 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 9595ad6..ffca431 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -29,6 +29,8 @@ import kafka.integration.KafkaServerTestHarness
 import org.junit.{After, Before}
 
 import scala.collection.mutable.Buffer
+import scala.util.control.Breaks.{breakable, break}
+import java.util.ConcurrentModificationException
 
 /**
  * A helper class for writing integration tests that involve producers, consumers, and servers
@@ -64,17 +66,9 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerConfig.putAll(consumerSecurityProps)
     for (i <- 0 until producerCount)
-      producers += TestUtils.createNewProducer(brokerList,
-                                               securityProtocol = this.securityProtocol,
-                                               trustStoreFile = this.trustStoreFile,
-                                               saslProperties = this.saslProperties,
-                                               props = Some(producerConfig))
+      producers += createNewProducer
     for (i <- 0 until consumerCount) {
-      consumers += TestUtils.createNewConsumer(brokerList,
-                                               securityProtocol = this.securityProtocol,
-                                               trustStoreFile = this.trustStoreFile,
-                                               saslProperties = this.saslProperties,
-                                               props = Some(consumerConfig))
+      consumers += createNewConsumer
     }
 
     // create the consumer offset topic
@@ -85,10 +79,42 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
       servers.head.groupCoordinator.offsetsTopicConfigs)
   }
 
+  //extracted method to allow for different params in some specific tests
+  def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+      TestUtils.createNewProducer(brokerList,
+                                  securityProtocol = this.securityProtocol,
+                                  trustStoreFile = this.trustStoreFile,
+                                  saslProperties = this.saslProperties,
+                                  props = Some(producerConfig))
+  }
+  
+  //extracted method to allow for different params in some specific tests
+  def createNewConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
+      TestUtils.createNewConsumer(brokerList,
+                                  securityProtocol = this.securityProtocol,
+                                  trustStoreFile = this.trustStoreFile,
+                                  saslProperties = this.saslProperties,
+                                  props = Some(consumerConfig))
+  }
+
   @After
   override def tearDown() {
     producers.foreach(_.close())
-    consumers.foreach(_.close())
+    
+    consumers.foreach { consumer => 
+      breakable {
+        while(true) {
+          try {
+            consumer.close
+            break
+          } catch {
+            //short wait to make sure that woken up consumer can be closed without spurious ConcurrentModificationException
+            case e: ConcurrentModificationException => Thread.sleep(100L)
+          }
+        }
+      }
+    }
+    
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ea5a213..d39de75 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -26,7 +26,7 @@ import org.junit.Test
 import java.util.Properties
 
 import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 
 class DeleteTopicTest extends ZooKeeperTestHarness {
 
@@ -206,9 +206,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // start topic deletion
     try {
       AdminUtils.deleteTopic(zkUtils, "test2")
-      fail("Expected InvalidTopicException")
+      fail("Expected UnknownTopicOrPartitionException")
     } catch {
-      case e: InvalidTopicException => // expected exception
+      case e: UnknownTopicOrPartitionException => // expected exception
     }
     // verify delete topic path for test2 is removed from zookeeper
     TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index a59316b..e04e1b7 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -58,7 +58,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
 
     // Basic
     validateErrorDeleteTopicRequests(new DeleteTopicsRequest(Set("invalid-topic").asJava, timeout),
-      Map("invalid-topic" -> Errors.INVALID_TOPIC_EXCEPTION))
+      Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION))
 
     // Partial
     TestUtils.createTopic(zkUtils, "partial-topic-1", 1, 1, servers)
@@ -67,7 +67,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
       "partial-invalid-topic").asJava, timeout),
       Map(
         "partial-topic-1" -> Errors.NONE,
-        "partial-invalid-topic" -> Errors.INVALID_TOPIC_EXCEPTION
+        "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION
       )
     )
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8124f6e0/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 7b16ab0..1b1c593 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -64,6 +64,8 @@ Note: Because new protocols are introduced, it is important to upgrade your Kafk
     <li> Kafka clusters can now be uniquely identified by a cluster id. It will be automatically generated when a broker is upgraded to 0.10.1.0. The cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId metric and it is part of the Metadata response. Serializers, client interceptors and metric reporters can receive the cluster id by implementing the ClusterResourceListener interface. </li>
     <li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li>
     <li> The new Java Consumer now allows users to search offsets by timestamp on partitions. </li>
+    <li> When using an Authorizer and a user hasn't got <b>Describe</b> authorization on a topic, the broker will no longer return TOPIC_AUTHORIZATION_FAILED errors  
+         but just UNKNOWN_TOPIC_OR_PARTITION errors, to avoid leaking topic names.</li>
 </ul>
 
 <h5><a id="upgrade_1010_new_protocols" href="#upgrade_1010_new_protocols">New Protocol Versions</a></h5>