You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/10/06 20:15:33 UTC

kafka git commit: KAFKA-5547; Return TOPIC_AUTHORIZATION_FAILED error if no describe access for topics

Repository: kafka
Updated Branches:
  refs/heads/trunk fb6ca658d -> 10cd98cc8


KAFKA-5547; Return TOPIC_AUTHORIZATION_FAILED error if no describe access for topics

Author: Manikumar Reddy <ma...@gmail.com>

Reviewers: Rajini Sivaram <ra...@googlemail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #3924 from omkreddy/KAFKA-5547-TOPIC-AUTHRO


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10cd98cc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10cd98cc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10cd98cc

Branch: refs/heads/trunk
Commit: 10cd98cc894b88c5d1e24fc54c66361ad9914df2
Parents: fb6ca65
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Fri Oct 6 12:51:30 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Oct 6 12:51:30 2017 -0700

----------------------------------------------------------------------
 .../consumer/internals/ConsumerCoordinator.java |   5 +-
 .../common/requests/OffsetFetchResponse.java    |   2 +
 .../src/main/scala/kafka/server/KafkaApis.scala | 198 ++++++++++---------
 .../kafka/api/AuthorizerIntegrationTest.scala   |  53 ++---
 .../kafka/api/EndToEndAuthorizationTest.scala   |   4 +-
 .../api/SaslEndToEndAuthorizationTest.scala     |   6 +-
 docs/upgrade.html                               |   4 +
 7 files changed, 145 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 38ca041..5482db7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -786,7 +786,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         future.raise(new CommitFailedException());
                         return;
                     } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                        future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
+                        future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
                         return;
                     } else {
                         future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
@@ -857,8 +857,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());
 
                     if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                        future.raise(new KafkaException("Partition " + tp + " may not exist or the user may not have " +
-                                "Describe access to the topic"));
+                        future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
                     } else {
                         future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 4d069fe..e398442 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -85,6 +85,8 @@ public class OffsetFetchResponse extends AbstractResponse {
     public static final String NO_METADATA = "";
     public static final PartitionData UNKNOWN_PARTITION = new PartitionData(INVALID_OFFSET, NO_METADATA,
             Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    public static final PartitionData UNAUTHORIZED_PARTITION = new PartitionData(INVALID_OFFSET, NO_METADATA,
+            Errors.TOPIC_AUTHORIZATION_FAILED);
 
     /**
      * Possible error codes:

http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c171aaa..aa00565 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -264,25 +264,25 @@ class KafkaApis(val requestChannel: RequestChannel,
       }.toMap
       sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava))
     } else {
-      val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
-        case (topicPartition, _) =>
-          val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
-          val exists = metadataCache.contains(topicPartition.topic)
-          if (!authorizedForDescribe && exists)
-              debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-                s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION")
-          authorizedForDescribe && exists
-      }
 
-      val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition {
-        case (topicPartition, _) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
+      var unauthorizedTopics = Set[TopicPartition]()
+      var nonExistingTopics = Set[TopicPartition]()
+      var authorizedTopics = mutable.Map[TopicPartition, OffsetCommitRequest.PartitionData]()
+
+      for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala.toMap) {
+        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
+          unauthorizedTopics += topicPartition
+        else if (!metadataCache.contains(topicPartition.topic))
+          nonExistingTopics += topicPartition
+        else
+          authorizedTopics += (topicPartition -> partitionData)
       }
 
       // the callback for sending an offset commit response
       def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) {
         val combinedCommitStatus = commitStatus ++
-          unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++
-          nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
+          nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
 
         if (isDebugEnabled)
           combinedCommitStatus.foreach { case (topicPartition, error) =>
@@ -313,7 +313,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               case e: Throwable => (topicPartition, Errors.forException(e))
             }
         }
-        sendResponseCallback(responseInfo)
+        sendResponseCallback(responseInfo.toMap)
       } else {
         // for version 1 and beyond store offsets in offset manager
 
@@ -353,7 +353,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           offsetCommitRequest.groupId,
           offsetCommitRequest.memberId,
           offsetCommitRequest.generationId,
-          partitionData,
+          partitionData.toMap,
           sendResponseCallback)
       }
     }
@@ -381,21 +381,25 @@ class KafkaApis(val requestChannel: RequestChannel,
       return
     }
 
-    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
-      produceRequest.partitionRecordsOrFail.asScala.partition { case (tp, _) =>
-        authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
-      }
+    var unauthorizedTopics = Set[TopicPartition]()
+    var nonExistingTopics = Set[TopicPartition]()
+    var authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
 
-    val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
-      case (tp, _) => authorize(request.session, Write, new Resource(Topic, tp.topic))
+    for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) {
+      if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
+        unauthorizedTopics += topicPartition
+      else if (!metadataCache.contains(topicPartition.topic))
+        nonExistingTopics += topicPartition
+      else
+        authorizedRequestInfo += (topicPartition -> memoryRecords)
     }
 
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
 
       val mergedResponseStatus = responseStatus ++
-        unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
-        nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
+        unauthorizedTopics.map(_ -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
+        nonExistingTopics.map(_ -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))
 
       var errorInResponse = false
 
@@ -479,21 +483,26 @@ class KafkaApis(val requestChannel: RequestChannel,
     val versionId = request.header.apiVersion
     val clientId = request.header.clientId
 
-    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.fetchData.asScala.toSeq.partition {
-      case (tp, _) => authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp.topic)
-    }
+    var unauthorizedTopics = Set[TopicPartition]()
+    var nonExistingTopics = Set[TopicPartition]()
+    var authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
 
-    val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
-      case (tp, _) => authorize(request.session, Read, new Resource(Topic, tp.topic))
+    for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) {
+      if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
+        unauthorizedTopics += topicPartition
+      else if (!metadataCache.contains(topicPartition.topic))
+        nonExistingTopics += topicPartition
+      else
+        authorizedRequestInfo += (topicPartition -> partitionData)
     }
 
-    val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
-      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+    val nonExistingPartitionData = nonExistingTopics.map {
+      case tp => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
         FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
     }
 
-    val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
-      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+    val unauthorizedForReadPartitionData = unauthorizedTopics.map {
+      case tp => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
         FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
     }
 
@@ -538,7 +547,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData
+      val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingPartitionData
 
       val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
 
@@ -644,7 +653,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
-      new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, List[JLong]().asJava)
+      new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, List[JLong]().asJava)
     )
 
     val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) =>
@@ -697,7 +706,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
-      new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+      new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
                                            ListOffsetResponse.UNKNOWN_TIMESTAMP,
                                            ListOffsetResponse.UNKNOWN_OFFSET)
     })
@@ -957,7 +966,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         Set.empty[MetadataResponse.TopicMetadata]
       else
         unauthorizedForDescribeTopics.map(topic =>
-          new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()))
+          new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, java.util.Collections.emptyList()))
 
     // In version 0, we returned an error when brokers with replicas were unavailable,
     // while in higher versions we simply don't include the broker in the returned broker list
@@ -1029,7 +1038,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               }
             }.toMap
 
-            val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+            val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
             new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
           } else {
             // versions 1 and above read offsets from Kafka
@@ -1050,7 +1059,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               if (error != Errors.NONE)
                 offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
               else {
-                val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+                val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
                 new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
               }
             }
@@ -1370,11 +1379,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       val (queuedForDeletion, valid) = authorized.partition { case (topic, _) =>
         controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic)
-
       }
 
       val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request.")) ++
-        unauthorized.keySet.map( topic => topic -> createPartitionsAuthorizationApiError(request.session, topic) ) ++
+        unauthorized.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "The topic authorization is failed.")) ++
         queuedForDeletion.keySet.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion."))
 
       adminManager.createPartitions(createPartitionsRequest.timeout, valid, createPartitionsRequest.validateOnly,
@@ -1382,28 +1390,26 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  private def createPartitionsAuthorizationApiError(session: RequestChannel.Session, topic: String): ApiError = {
-    if (authorize(session, Describe, new Resource(Topic, topic)))
-      new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)
-    else
-      new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null)
-  }
-
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {
     val deleteTopicRequest = request.body[DeleteTopicsRequest]
 
-    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition { topic =>
-      authorize(request.session, Describe, new Resource(Topic, topic)) && metadataCache.contains(topic)
-    }
+    var unauthorizedTopics = Set[String]()
+    var nonExistingTopics = Set[String]()
+    var authorizedForDeleteTopics =  Set[String]()
 
-    val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition { topic =>
-      authorize(request.session, Delete, new Resource(Topic, topic))
+    for (topic <- deleteTopicRequest.topics.asScala) {
+      if (!authorize(request.session, Delete, new Resource(Topic, topic)))
+        unauthorizedTopics += topic
+      else if (!metadataCache.contains(topic))
+        nonExistingTopics += topic
+      else
+        authorizedForDeleteTopics += topic
     }
 
     def sendResponseCallback(results: Map[String, Errors]): Unit = {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
-            unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
+        val completeResults = unauthorizedTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++
+            nonExistingTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ results
         val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava)
         trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
         responseBody
@@ -1418,12 +1424,12 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseCallback(results)
     } else {
       // If no authorized topics return immediately
-      if (authorizedTopics.isEmpty)
+      if (authorizedForDeleteTopics.isEmpty)
         sendResponseCallback(Map())
       else {
         adminManager.deleteTopics(
           deleteTopicRequest.timeout.toInt,
-          authorizedTopics,
+          authorizedForDeleteTopics,
           sendResponseCallback
         )
       }
@@ -1433,21 +1439,26 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleDeleteRecordsRequest(request: RequestChannel.Request) {
     val deleteRecordsRequest = request.body[DeleteRecordsRequest]
 
-    val (authorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteRecordsRequest.partitionOffsets.asScala.partition {
-      case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
-    }
+    var unauthorizedTopics = Set[TopicPartition]()
+    var nonExistingTopics = Set[TopicPartition]()
+    var authorizedForDeleteTopics = mutable.Map[TopicPartition, Long]()
 
-    val (authorizedForDeleteTopics, unauthorizedForDeleteTopics) = authorizedForDescribeTopics.partition {
-      case (topicPartition, _) => authorize(request.session, Delete, new Resource(Topic, topicPartition.topic))
+    for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) {
+      if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic)))
+        unauthorizedTopics += topicPartition
+      else if (!metadataCache.contains(topicPartition.topic))
+        nonExistingTopics += topicPartition
+      else
+        authorizedForDeleteTopics += (topicPartition -> offset)
     }
 
     // the callback for sending a DeleteRecordsResponse
     def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
 
       val mergedResponseStatus = responseStatus ++
-        unauthorizedForDeleteTopics.mapValues(_ =>
+        unauthorizedTopics.map(_ ->
           new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++
-        nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
+        nonExistingTopics.map(_ ->
           new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION))
 
       mergedResponseStatus.foreach { case (topicPartition, status) =>
@@ -1646,24 +1657,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     else {
       val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())}
 
-      val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
-        partitionsToAdd.asScala.partition { tp =>
-          authorize(request.session, Describe, new Resource(Topic, tp.topic)) && metadataCache.contains(tp)
-        }
+      var unauthorizedTopics = Set[TopicPartition]()
+      var nonExistingTopics = Set[TopicPartition]()
+      var authorizedPartitions =  Set[TopicPartition]()
 
-      val (authorizedPartitions, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { tp =>
-        authorize(request.session, Write, new Resource(Topic, tp.topic))
+      for ( topicPartition <- partitionsToAdd.asScala) {
+        if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic)))
+          unauthorizedTopics += topicPartition
+        else if (!metadataCache.contains(topicPartition.topic))
+          nonExistingTopics += topicPartition
+        else
+          authorizedPartitions += topicPartition
       }
 
-      if (nonExistingOrUnauthorizedForDescribeTopics.nonEmpty
-        || unauthorizedForWriteRequestInfo.nonEmpty
+      if (unauthorizedTopics.nonEmpty
+        || nonExistingTopics.nonEmpty
         || internalTopics.nonEmpty) {
 
         // Any failed partition check causes the entire request to fail. We send the appropriate error codes for the
         // partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded
         // the authorization check to indicate that they were not added to the transaction.
-        val partitionErrors = (unauthorizedForWriteRequestInfo.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
-          nonExistingOrUnauthorizedForDescribeTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++
+        val partitionErrors = (unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
+          nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++
           internalTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
           authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)).toMap
 
@@ -1734,26 +1749,24 @@ class KafkaApis(val requestChannel: RequestChannel,
     else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId)))
       sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
     else {
-      val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = txnOffsetCommitRequest.offsets.asScala.toMap.partition {
-        case (topicPartition, _) =>
-          val authorizedForDescribe = authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
-          val exists = metadataCache.contains(topicPartition.topic)
-          if (!authorizedForDescribe && exists)
-              debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " +
-                s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning " +
-                s"${Errors.UNKNOWN_TOPIC_OR_PARTITION.name}")
-          authorizedForDescribe && exists
-      }
-
-      val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition {
-        case (topicPartition, _) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
+      var unauthorizedTopics = Set[TopicPartition]()
+      var nonExistingTopics = Set[TopicPartition]()
+      var authorizedTopics = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]()
+
+      for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) {
+        if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic)))
+          unauthorizedTopics += topicPartition
+        else if (!metadataCache.contains(topicPartition.topic))
+          nonExistingTopics += topicPartition
+        else
+          authorizedTopics += (topicPartition -> commitedOffset)
       }
 
       // the callback for sending an offset commit response
       def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) {
         val combinedCommitStatus = commitStatus ++
-          unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++
-          nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++
+          nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION)
 
         if (isDebugEnabled)
           combinedCommitStatus.foreach { case (topicPartition, error) =>
@@ -1769,7 +1782,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (authorizedTopics.isEmpty)
         sendResponseCallback(Map.empty)
       else {
-        val offsetMetadata = convertTxnOffsets(authorizedTopics)
+        val offsetMetadata = convertTxnOffsets(authorizedTopics.toMap)
         groupCoordinator.handleTxnCommitOffsets(
           txnOffsetCommitRequest.consumerGroupId,
           txnOffsetCommitRequest.producerId,
@@ -1942,12 +1955,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   private def configsAuthorizationApiError(session: RequestChannel.Session, resource: RResource): ApiError = {
     val error = resource.`type` match {
       case RResourceType.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED
-      case RResourceType.TOPIC =>
-        // Don't leak topic name unless the user has describe topic permission
-        if (authorize(session, Describe, new Resource(Topic, resource.name)))
-          Errors.TOPIC_AUTHORIZATION_FAILED
-        else
-          Errors.UNKNOWN_TOPIC_OR_PARTITION
+      case RResourceType.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
       case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
     }
     new ApiError(error, null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d07d08e..522fcd3 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -382,6 +382,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def describeLogDirsRequest = new DescribeLogDirsRequest.Builder(Collections.singleton(tp)).build()
 
+  private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build()
+
+  private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build()
+
 
   @Test
   def testAuthorizationWithTopicExisting() {
@@ -413,26 +417,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DESCRIBE_ACLS -> describeAclsRequest,
       ApiKeys.ALTER_REPLICA_LOG_DIRS -> alterReplicaLogDirsRequest,
       ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest,
-      ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
+      ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
+      ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
+      ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
       removeAllAcls()
       val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
-      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false)
+      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
 
       val resourceToAcls = requestKeysToAcls(key)
       resourceToAcls.get(topicResource).foreach { acls =>
         val describeAcls = topicDescribeAcl(topicResource)
         val isAuthorized =  describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
-        sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true)
+        sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized)
         removeAllAcls()
       }
 
       for ((resource, acls) <- resourceToAcls)
         addAndVerifyAcls(acls, resource)
-      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false)
+      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
     }
   }
 
@@ -447,7 +453,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
 
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
-      ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
       ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false),
       ApiKeys.PRODUCE -> createProduceRequest,
       ApiKeys.FETCH -> createFetchRequest,
@@ -455,26 +460,29 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
       ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
-      ApiKeys.DELETE_RECORDS -> deleteRecordsRequest
+      ApiKeys.DELETE_RECORDS -> deleteRecordsRequest,
+      ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
+      ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
+      ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
       removeAllAcls()
       val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
-      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false)
+      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false, topicExists = false)
 
       val resourceToAcls = requestKeysToAcls(key)
       resourceToAcls.get(topicResource).foreach { acls =>
         val describeAcls = topicDescribeAcl(topicResource)
         val isAuthorized = describeAcls == acls
         addAndVerifyAcls(describeAcls, topicResource)
-        sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
+        sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, topicExists = false)
         removeAllAcls()
       }
 
       for ((resource, acls) <- resourceToAcls)
         addAndVerifyAcls(acls, resource)
-      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false, topicExists = false)
+      sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true, topicExists = false)
     }
   }
 
@@ -484,7 +492,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       sendRecords(numRecords, tp)
       fail("should have thrown exception")
     } catch {
-      case _: TimeoutException => //expected
+      case _: TopicAuthorizationException => //expected
     }
   }
 
@@ -534,8 +542,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     sendRecords(numRecords, topicPartition)
   }
 
-  @Test(expected = classOf[GroupAuthorizationException])
-  def testConsumeWithNoAccess(): Unit = {
+  @Test(expected = classOf[TopicAuthorizationException])
+  def testConsumeUsingAssignWithNoAccess(): Unit = {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
@@ -893,10 +901,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     this.consumers.head.position(tp)
   }
 
-  @Test
+  @Test(expected = classOf[TopicAuthorizationException])
   def testListOffsetsWithNoTopicAccess() {
-    val partitionInfos = this.consumers.head.partitionsFor(topic)
-    assertNull(partitionInfos)
+    this.consumers.head.partitionsFor(topic)
   }
 
   @Test
@@ -935,7 +942,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
     val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteResponse.errors.asScala.head._2)
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteResponse.errors.asScala.head._2)
   }
 
   @Test
@@ -963,7 +970,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
     val version = ApiKeys.DELETE_RECORDS.latestVersion
     val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteRecordsResponse.responses.asScala.head._2.error)
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteRecordsResponse.responses.asScala.head._2.error)
   }
 
   @Test
@@ -990,7 +997,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
     val version = ApiKeys.CREATE_PARTITIONS.latestVersion
     val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
-    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, createPartitionsResponse.errors.asScala.head._2.error)
+    assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, createPartitionsResponse.errors.asScala.head._2.error)
   }
 
   @Test
@@ -1240,7 +1247,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
                                         request: AbstractRequest,
                                         resources: Set[ResourceType],
                                         isAuthorized: Boolean,
-                                        isAuthorizedTopicDescribe: Boolean,
                                         topicExists: Boolean = true): AbstractResponse = {
     val resp = connectAndSend(request, apiKey)
     val response = requestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer], classOf[Short]).invoke(
@@ -1251,8 +1257,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       if (resourceType == Topic) {
         if (isAuthorized)
           Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.error)
-        else if (!isAuthorizedTopicDescribe)
-          Set(Errors.UNKNOWN_TOPIC_OR_PARTITION)
         else
           Set(Topic.error)
       } else {
@@ -1266,9 +1270,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       else
         assertTrue(s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors", authorizationErrors.contains(error))
     else if (resources == Set(Topic))
-      assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
-    else
-      assertNotEquals(s"$apiKey had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error)
+      if (isAuthorized)
+        assertEquals(s"$apiKey had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
+      else
+        assertEquals(s"$apiKey had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error)
 
     response
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index bbb3249..720d8b6 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -214,7 +214,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     * Tests that a producer fails to publish messages when the appropriate ACL
     * isn't set.
     */
-  @Test(expected = classOf[TimeoutException])
+  @Test(expected = classOf[TopicAuthorizationException])
   def testNoProduceWithoutDescribeAcl(): Unit = {
     sendRecords(numRecords, tp)
   }
@@ -246,7 +246,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     consumeRecords(this.consumers.head)
   }
   
-  @Test(expected = classOf[TimeoutException])
+  @Test(expected = classOf[TopicAuthorizationException])
   def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
     noConsumeWithoutDescribeAclSetup()
     consumers.head.subscribe(List(topic).asJava)

http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index a366b1d..fb6bee8 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -21,8 +21,8 @@ import java.util.Properties
 import kafka.utils.TestUtils
 import kafka.utils.Implicits._
 import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.errors.GroupAuthorizationException
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.errors.TopicAuthorizationException
 import org.junit.{Before, Test}
 
 import scala.collection.immutable.List
@@ -77,9 +77,9 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
 
     try {
       consumeRecords(consumer2)
-      fail("Expected exception as consumer2 has no access to group")
+      fail("Expected exception as consumer2 has no access to topic")
     } catch {
-      case _: GroupAuthorizationException => //expected
+      case _: TopicAuthorizationException => //expected
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/10cd98cc/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 5872c7c..862dadb 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -86,6 +86,10 @@
         inversion bug, it was previously enabled by default and disabled if <code>kafka_mx4jenable</code> was set to <code>true</code>.</li>
     <li>The package <code>org.apache.kafka.common.security.auth</code> in the clients jar has been made public and added to the javadocs.
         Internal classes which had previously been located in this package have been moved elsewhere.</li>
+    <li>When using an Authorizer and a user doesn't have required permissions on a topic, the broker
+        will return TOPIC_AUTHORIZATION_FAILED errors to requests irrespective of topic existence on broker.
+        If the user have required permissions and the topic doesn't exists, then the UNKNOWN_TOPIC_OR_PARTITION
+        error code will be returned. </li>
 </ul>
 
 <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5>