You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/04 20:02:42 UTC

[2/2] kafka git commit: KAFKA-2691: Improve handling of authorization failure during metadata refresh

KAFKA-2691: Improve handling of authorization failure during metadata refresh

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Jun Rao

Closes #394 from hachikuji/KAFKA-2691


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c39e79bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c39e79bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c39e79bb

Branch: refs/heads/trunk
Commit: c39e79bb5af5b4e56bec358f8ec3758e6822dbcf
Parents: c30ee50
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Nov 4 11:02:30 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Nov 4 11:02:30 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java |   6 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |  28 +-
 .../consumer/internals/AbstractCoordinator.java |  39 +-
 .../consumer/internals/ConsumerCoordinator.java |  29 +-
 .../clients/consumer/internals/Fetcher.java     |  29 +-
 .../kafka/clients/producer/KafkaProducer.java   |  30 +-
 .../clients/producer/internals/Sender.java      |   3 +
 .../java/org/apache/kafka/common/Cluster.java   |  32 +-
 .../common/errors/AuthorizationException.java   |   2 +
 .../errors/GroupAuthorizationException.java     |  27 +
 .../errors/TopicAuthorizationException.java     |  33 ++
 .../apache/kafka/common/protocol/Errors.java    |  12 +-
 .../requests/GroupCoordinatorResponse.java      |   9 +
 .../common/requests/HeartbeatResponse.java      |   4 +-
 .../common/requests/JoinGroupResponse.java      |   3 +-
 .../common/requests/LeaveGroupResponse.java     |   1 +
 .../kafka/common/requests/MetadataRequest.java  |  16 +-
 .../kafka/common/requests/MetadataResponse.java |  18 +-
 .../common/requests/OffsetCommitResponse.java   |   6 +-
 .../common/requests/OffsetFetchResponse.java    |   4 +-
 .../common/requests/SyncGroupResponse.java      |   4 +-
 .../org/apache/kafka/clients/MetadataTest.java  |  12 +-
 .../internals/ConsumerCoordinatorTest.java      |  20 +-
 .../clients/consumer/internals/FetcherTest.java |  18 +
 .../clients/producer/MockProducerTest.java      |   3 +-
 .../internals/DefaultPartitionerTest.java       |   3 +-
 .../internals/RecordAccumulatorTest.java        |   4 +-
 .../common/requests/RequestResponseTest.java    |   3 +-
 .../java/org/apache/kafka/test/TestUtils.java   |   3 +-
 .../runtime/distributed/WorkerCoordinator.java  |   1 -
 .../kafka/common/AuthorizationException.scala   |  13 +-
 .../main/scala/kafka/common/ErrorMapping.scala  |  11 +-
 .../kafka/security/auth/ResourceType.scala      |   7 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  54 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   | 508 +++++++++++++------
 .../processor/DefaultPartitionGrouperTest.java  |   3 +-
 .../processor/internals/StreamThreadTest.java   |   2 +-
 37 files changed, 716 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index f2fca12..7e77c18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -238,14 +238,18 @@ public final class Metadata {
     }
 
     private Cluster getClusterForCurrentTopics(Cluster cluster) {
+        Set<String> unauthorizedTopics = new HashSet<>();
         Collection<PartitionInfo> partitionInfos = new ArrayList<>();
         List<Node> nodes = Collections.emptyList();
         if (cluster != null) {
+            unauthorizedTopics.addAll(cluster.unauthorizedTopics());
+            unauthorizedTopics.retainAll(this.topics);
+
             for (String topic : this.topics) {
                 partitionInfos.addAll(cluster.partitionsForTopic(topic));
             }
             nodes = cluster.nodes();
         }
-        return new Cluster(nodes, partitionInfos);
+        return new Cluster(nodes, partitionInfos, unauthorizedTopics);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index cce13dd..a6be519 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -544,7 +544,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     metricGrpPrefix,
                     metricsTags,
                     this.time,
-                    requestTimeoutMs,
                     retryBackoffMs,
                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
                     config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
@@ -777,10 +776,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws NoOffsetForPartitionException if there is no stored offset for a subscribed partition and no automatic
      *             offset reset policy has been configured.
      * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException if there is OffsetOutOfRange error in fetchResponse and
-     *         the defaultResetPolicy is NONE
-     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
-     *
-     * @throws org.apache.kafka.common.errors.AuthorizationException if caller does not have Read permission on topic.
+     *             the defaultResetPolicy is NONE
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.AuthorizationException if caller does Read access to any of the subscribed
+     *             topics or to the configured groupId
      */
     @Override
     public ConsumerRecords<K, V> poll(long timeout) {
@@ -883,7 +883,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * encountered (in which case it is thrown to the caller).
      *
      * @param offsets A map of offsets by partition with associated metadata
-     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+     *             configured groupId
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
@@ -1008,7 +1011,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The offset
      * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
      *             available.
-     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+     *
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+     *             configured groupId
      */
     public long position(TopicPartition partition) {
         acquire();
@@ -1035,7 +1042,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *
      * @param partition The partition to check
      * @return The last committed offset and metadata or null if there was no prior commit
-     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+     *             configured groupId
      */
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
@@ -1160,7 +1170,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
-     * The thread which is blocking in an operation will throw {@link WakeupException}.
+     * The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}.
      */
     @Override
     public void wakeup() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 9cf825c..781ff78 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -17,6 +17,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -88,7 +89,6 @@ public abstract class AbstractCoordinator {
     protected final ConsumerNetworkClient client;
     protected final Time time;
     protected final long retryBackoffMs;
-    protected final long requestTimeoutMs;
 
     private boolean needsJoinPrepare = true;
     private boolean rejoinNeeded = true;
@@ -108,7 +108,6 @@ public abstract class AbstractCoordinator {
                                String metricGrpPrefix,
                                Map<String, String> metricTags,
                                Time time,
-                               long requestTimeoutMs,
                                long retryBackoffMs) {
         this.client = client;
         this.time = time;
@@ -120,7 +119,6 @@ public abstract class AbstractCoordinator {
         this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
         this.heartbeatTask = new HeartbeatTask();
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
-        this.requestTimeoutMs = requestTimeoutMs;
         this.retryBackoffMs = retryBackoffMs;
     }
 
@@ -178,7 +176,7 @@ public abstract class AbstractCoordinator {
     public void ensureCoordinatorKnown() {
         while (coordinatorUnknown()) {
             RequestFuture<Void> future = sendGroupMetadataRequest();
-            client.poll(future, requestTimeoutMs);
+            client.poll(future);
 
             if (future.failed()) {
                 if (future.isRetriable())
@@ -376,6 +374,8 @@ public abstract class AbstractCoordinator {
                 log.error("Attempt to join group {} failed due to: {}",
                         groupId, error.exception().getMessage());
                 future.raise(error);
+            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+                future.raise(new GroupAuthorizationException(groupId));
             } else {
                 // unexpected error, throw the exception
                 future.raise(new KafkaException("Unexpected error in join group response: "
@@ -427,6 +427,8 @@ public abstract class AbstractCoordinator {
             if (errorCode == Errors.NONE.code()) {
                 future.complete(syncResponse.memberAssignment());
                 sensors.syncLatency.record(response.requestLatencyMs());
+            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+                future.raise(new GroupAuthorizationException(groupId));
             } else {
                 AbstractCoordinator.this.rejoinNeeded = true;
                 future.raise(Errors.forCode(errorCode));
@@ -476,7 +478,8 @@ public abstract class AbstractCoordinator {
             // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
-            if (groupCoordinatorResponse.errorCode() == Errors.NONE.code()) {
+            short errorCode = groupCoordinatorResponse.errorCode();
+            if (errorCode == Errors.NONE.code()) {
                 this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
                         groupCoordinatorResponse.node().host(),
                         groupCoordinatorResponse.node().port());
@@ -487,8 +490,10 @@ public abstract class AbstractCoordinator {
                 if (generation > 0)
                     heartbeatTask.reset();
                 future.complete(null);
+            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+                future.raise(new GroupAuthorizationException(groupId));
             } else {
-                future.raise(Errors.forCode(groupCoordinatorResponse.errorCode()));
+                future.raise(Errors.forCode(errorCode));
             }
         }
     }
@@ -538,31 +543,33 @@ public abstract class AbstractCoordinator {
         @Override
         public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
             sensors.heartbeatLatency.record(response.requestLatencyMs());
-            short error = heartbeatResponse.errorCode();
-            if (error == Errors.NONE.code()) {
+            short errorCode = heartbeatResponse.errorCode();
+            if (errorCode == Errors.NONE.code()) {
                 log.debug("Received successful heartbeat response.");
                 future.complete(null);
-            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
-                    || error == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+            } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
+                    || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
                 log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
                 coordinatorDead();
-                future.raise(Errors.forCode(error));
-            } else if (error == Errors.REBALANCE_IN_PROGRESS.code()) {
+                future.raise(Errors.forCode(errorCode));
+            } else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
                 log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
                 AbstractCoordinator.this.rejoinNeeded = true;
                 future.raise(Errors.REBALANCE_IN_PROGRESS);
-            } else if (error == Errors.ILLEGAL_GENERATION.code()) {
+            } else if (errorCode == Errors.ILLEGAL_GENERATION.code()) {
                 log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
                 AbstractCoordinator.this.rejoinNeeded = true;
                 future.raise(Errors.ILLEGAL_GENERATION);
-            } else if (error == Errors.UNKNOWN_MEMBER_ID.code()) {
+            } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
                 log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group.");
                 memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                 AbstractCoordinator.this.rejoinNeeded = true;
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
+            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+                future.raise(new GroupAuthorizationException(groupId));
             } else {
-                future.raise(new KafkaException("Unexpected error in heartbeat response: "
-                        + Errors.forCode(error).exception().getMessage()));
+                future.raise(new KafkaException("Unexpected errorCode in heartbeat response: "
+                        + Errors.forCode(errorCode).exception().getMessage()));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5b5d6ff..c7323cb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -15,6 +15,8 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -82,7 +84,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
                                String metricGrpPrefix,
                                Map<String, String> metricTags,
                                Time time,
-                               long requestTimeoutMs,
                                long retryBackoffMs,
                                OffsetCommitCallback defaultOffsetCommitCallback,
                                boolean autoCommitEnabled,
@@ -95,7 +96,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
                 metricGrpPrefix,
                 metricTags,
                 time,
-                requestTimeoutMs,
                 retryBackoffMs);
         this.metadata = metadata;
 
@@ -136,6 +136,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
         this.metadata.addListener(new Metadata.Listener() {
             @Override
             public void onMetadataUpdate(Cluster cluster) {
+                // if we encounter any unauthorized topics, raise an exception to the user
+                if (!cluster.unauthorizedTopics().isEmpty())
+                    throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
+
                 if (subscriptions.hasPatternSubscription()) {
                     final List<String> topicsToSubscribe = new ArrayList<>();
 
@@ -340,13 +344,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
             RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
             client.poll(future);
 
-            if (future.succeeded()) {
+            if (future.succeeded())
                 return;
-            }
 
-            if (!future.isRetriable()) {
+            if (!future.isRetriable())
                 throw future.exception();
-            }
 
             Utils.sleep(retryBackoffMs);
         }
@@ -439,6 +441,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
         @Override
         public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
             sensors.commitLatency.record(response.requestLatencyMs());
+            Set<String> unauthorizedTopics = new HashSet<>();
+
             for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
                 TopicPartition tp = entry.getKey();
                 OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
@@ -450,6 +454,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
                     if (subscriptions.isAssigned(tp))
                         // update the local cache only if the partition is still assigned
                         subscriptions.committed(tp, offsetAndMetadata);
+                } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+                    future.raise(new GroupAuthorizationException(groupId));
+                    return;
+                } else if (errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
+                    unauthorizedTopics.add(tp.topic());
                 } else {
                     if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
                         // just retry
@@ -458,7 +467,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
                             || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
                         coordinatorDead();
                     } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()
-                            || errorCode == Errors.ILLEGAL_GENERATION.code()) {
+                            || errorCode == Errors.ILLEGAL_GENERATION.code()
+                            || errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
                         // need to re-join group
                         subscriptions.needReassignment();
                     }
@@ -473,7 +483,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
                 }
             }
 
-            future.complete(null);
+            if (!unauthorizedTopics.isEmpty())
+                future.raise(new TopicAuthorizationException(unauthorizedTopics));
+            else
+                future.complete(null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4e0d5ec..7c5bca6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -78,7 +78,7 @@ public class Fetcher<K, V> {
     private final Deserializer<V> valueDeserializer;
 
     private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
-    private final Set<TopicPartition> unauthorizedTopicPartitions;
+    private final Set<String> unauthorizedTopics;
     private final Map<TopicPartition, Long> recordTooLargePartitions;
 
     public Fetcher(ConsumerNetworkClient client,
@@ -110,7 +110,7 @@ public class Fetcher<K, V> {
 
         this.records = new LinkedList<PartitionRecords<K, V>>();
         this.offsetOutOfRangePartitions = new HashMap<>();
-        this.unauthorizedTopicPartitions = new HashSet<>();
+        this.unauthorizedTopics = new HashSet<>();
         this.recordTooLargePartitions = new HashMap<>();
 
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
@@ -302,19 +302,18 @@ public class Fetcher<K, V> {
     }
 
     /**
-     * If any topic from previous fetchResponse contatains Authorization error, throw ApiException.
-     * @throws ApiException
+     * If any topic from previous fetchResponse contains an Authorization error, raise an exception
+     * @throws TopicAuthorizationException
      */
-    private void throwIfUnauthorized() throws ApiException {
-        if (!unauthorizedTopicPartitions.isEmpty()) {
-            StringBuilder sb = new StringBuilder();
-            for (TopicPartition topicPartition : unauthorizedTopicPartitions)
-                sb.append(topicPartition + ",");
-            unauthorizedTopicPartitions.clear();
-            throw new AuthorizationException(String.format("Not authorized to read from %s", sb.substring(0, sb.length() - 1).toString()));
+    private void throwIfUnauthorizedTopics() throws TopicAuthorizationException {
+        if (!unauthorizedTopics.isEmpty()) {
+            Set<String> topics = new HashSet<>(unauthorizedTopics);
+            unauthorizedTopics.clear();
+            throw new TopicAuthorizationException(topics);
         }
     }
-     /**
+
+    /**
      * If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException
      *
      * @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
@@ -346,7 +345,7 @@ public class Fetcher<K, V> {
         } else {
             Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
             throwIfOffsetOutOfRange();
-            throwIfUnauthorized();
+            throwIfUnauthorizedTopics();
             throwIfRecordTooLarge();
 
             for (PartitionRecords<K, V> part : this.records) {
@@ -557,9 +556,9 @@ public class Fetcher<K, V> {
                     else
                         this.offsetOutOfRangePartitions.put(tp, fetchOffset);
                     log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
-                } else if (partition.errorCode == Errors.AUTHORIZATION_FAILED.code()) {
+                } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
                     log.warn("Not authorized to read from topic {}.", tp.topic());
-                    unauthorizedTopicPartitions.add(tp);
+                    unauthorizedTopics.add(tp.topic());
                 } else if (partition.errorCode == Errors.UNKNOWN.code()) {
                     log.warn("Unknown error fetching data for topic-partition {}", tp);
                 } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ff3bfe6..21d635b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.MetricName;
@@ -473,21 +474,22 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         if (!this.metadata.containsTopic(topic))
             this.metadata.add(topic);
 
-        if (metadata.fetch().partitionsForTopic(topic) != null) {
+        if (metadata.fetch().partitionsForTopic(topic) != null)
             return;
-        } else {
-            long begin = time.milliseconds();
-            long remainingWaitMs = maxWaitMs;
-            while (metadata.fetch().partitionsForTopic(topic) == null) {
-                log.trace("Requesting metadata update for topic {}.", topic);
-                int version = metadata.requestUpdate();
-                sender.wakeup();
-                metadata.awaitUpdate(version, remainingWaitMs);
-                long elapsed = time.milliseconds() - begin;
-                if (elapsed >= maxWaitMs)
-                    throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
-                remainingWaitMs = maxWaitMs - elapsed;
-            }
+
+        long begin = time.milliseconds();
+        long remainingWaitMs = maxWaitMs;
+        while (metadata.fetch().partitionsForTopic(topic) == null) {
+            log.trace("Requesting metadata update for topic {}.", topic);
+            int version = metadata.requestUpdate();
+            sender.wakeup();
+            metadata.awaitUpdate(version, remainingWaitMs);
+            long elapsed = time.milliseconds() - begin;
+            if (elapsed >= maxWaitMs)
+                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+            if (metadata.fetch().unauthorizedTopics().contains(topic))
+                throw new TopicAuthorizationException(topic);
+            remainingWaitMs = maxWaitMs - elapsed;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 56be021..cada626 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.MetricName;
@@ -288,6 +289,8 @@ public class Sender implements Runnable {
                      error);
             this.accumulator.reenqueue(batch, now);
             this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
+        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+            batch.done(baseOffset, new TopicAuthorizationException(batch.topicPartition.topic()));
         } else {
             // tell the user the result of their request
             batch.done(baseOffset, error.exception());

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index e6a2e43..c7f3e07 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -23,6 +23,7 @@ import java.util.*;
 public final class Cluster {
 
     private final List<Node> nodes;
+    private final Set<String> unauthorizedTopics;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
     private final Map<String, List<PartitionInfo>> partitionsByTopic;
     private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
@@ -34,26 +35,28 @@ public final class Cluster {
      * @param nodes The nodes in the cluster
      * @param partitions Information about a subset of the topic-partitions this cluster hosts
      */
-    public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
+    public Cluster(Collection<Node> nodes,
+                   Collection<PartitionInfo> partitions,
+                   Set<String> unauthorizedTopics) {
         // make a randomized, unmodifiable copy of the nodes
-        List<Node> copy = new ArrayList<Node>(nodes);
+        List<Node> copy = new ArrayList<>(nodes);
         Collections.shuffle(copy);
         this.nodes = Collections.unmodifiableList(copy);
         
-        this.nodesById = new HashMap<Integer, Node>();
+        this.nodesById = new HashMap<>();
         for (Node node: nodes)
             this.nodesById.put(node.id(), node);
 
         // index the partitions by topic/partition for quick lookup
-        this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
+        this.partitionsByTopicPartition = new HashMap<>(partitions.size());
         for (PartitionInfo p : partitions)
             this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
 
         // index the partitions by topic and node respectively, and make the lists
         // unmodifiable so we can hand them out in user-facing apis without risk
         // of the client modifying the contents
-        HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<String, List<PartitionInfo>>();
-        HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<Integer, List<PartitionInfo>>();
+        HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<>();
+        HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<>();
         for (Node n : this.nodes) {
             partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
         }
@@ -68,30 +71,31 @@ public final class Cluster {
                 psNode.add(p);
             }
         }
-        this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
-        this.availablePartitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
+        this.partitionsByTopic = new HashMap<>(partsForTopic.size());
+        this.availablePartitionsByTopic = new HashMap<>(partsForTopic.size());
         for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet()) {
             String topic = entry.getKey();
             List<PartitionInfo> partitionList = entry.getValue();
             this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList));
-            List<PartitionInfo> availablePartitions = new ArrayList<PartitionInfo>();
+            List<PartitionInfo> availablePartitions = new ArrayList<>();
             for (PartitionInfo part : partitionList) {
                 if (part.leader() != null)
                     availablePartitions.add(part);
             }
             this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
         }
-        this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
+        this.partitionsByNode = new HashMap<>(partsForNode.size());
         for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
             this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
 
+        this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
     }
 
     /**
      * Create an empty cluster instance with no nodes and no topic-partitions.
      */
     public static Cluster empty() {
-        return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0));
+        return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
     }
 
     /**
@@ -104,7 +108,7 @@ public final class Cluster {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostName(), address.getPort()));
-        return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
+        return new Cluster(nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
     }
 
     /**
@@ -190,6 +194,10 @@ public final class Cluster {
         return this.partitionsByTopic.keySet();
     }
 
+    public Set<String> unauthorizedTopics() {
+        return unauthorizedTopics;
+    }
+
     @Override
     public String toString() {
         return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
index 2a01e5e..7fc932d 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
@@ -13,7 +13,9 @@
 package org.apache.kafka.common.errors;
 
 public class AuthorizationException extends ApiException {
+
     public AuthorizationException(String message) {
         super(message);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java
new file mode 100644
index 0000000..3a767aa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class GroupAuthorizationException extends AuthorizationException {
+    private final String groupId;
+
+    public GroupAuthorizationException(String groupId) {
+        super("Not authorized to access group: " + groupId);
+        this.groupId = groupId;
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
new file mode 100644
index 0000000..b5d33b9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class TopicAuthorizationException extends AuthorizationException {
+    private final Set<String> unauthorizedTopics;
+
+    public TopicAuthorizationException(Set<String> unauthorizedTopics) {
+        super("Not authorized to access topics: " + unauthorizedTopics);
+        this.unauthorizedTopics = unauthorizedTopics;
+    }
+
+    public TopicAuthorizationException(String unauthorizedTopic) {
+        this(Collections.singleton(unauthorizedTopic));
+    }
+
+    public Set<String> unauthorizedTopics() {
+        return unauthorizedTopics;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d4eb1f9..516e50b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -84,12 +84,16 @@ public enum Errors {
             new UnknownMemberIdException("The coordinator is not aware of this member.")),
     INVALID_SESSION_TIMEOUT(26,
             new ApiException("The session timeout is not within an acceptable range.")),
+    REBALANCE_IN_PROGRESS(27,
+            new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")),
     INVALID_COMMIT_OFFSET_SIZE(28,
             new ApiException("The committing offset data size is not valid")),
-    AUTHORIZATION_FAILED(29,
-            new ApiException("Request is not authorized.")),
-    REBALANCE_IN_PROGRESS(30,
-            new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed."));
+    TOPIC_AUTHORIZATION_FAILED(29,
+            new AuthorizationException("Topic authorization failed.")),
+    GROUP_AUTHORIZATION_FAILED(30,
+            new AuthorizationException("Group authorization failed.")),
+    CLUSTER_AUTHORIZATION_FAILED(31,
+            new AuthorizationException("Cluster authorization failed."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
index c28de70..3fe014d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
@@ -26,6 +26,15 @@ public class GroupCoordinatorResponse extends AbstractRequestResponse {
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String COORDINATOR_KEY_NAME = "coordinator";
 
+    /**
+     * Possible error codes:
+     *
+     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_GROUP (16)
+     * GROUP_AUTHORIZATION_FAILED (30)
+     */
+
+
     // coordinator level field names
     private static final String NODE_ID_KEY_NAME = "node_id";
     private static final String HOST_KEY_NAME = "host";

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 48cb4c0..a595efb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -25,12 +25,14 @@ public class HeartbeatResponse extends AbstractRequestResponse {
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
-     * Possible error code:
+     * Possible error codes:
      *
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
      * NOT_COORDINATOR_FOR_GROUP (16)
      * ILLEGAL_GENERATION (22)
      * UNKNOWN_MEMBER_ID (25)
+     * REBALANCE_IN_PROGRESS (27)
+     * GROUP_AUTHORIZATION_FAILED (30)
      */
 
     private final short errorCode;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 0615e5e..c869b1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -29,7 +29,7 @@ public class JoinGroupResponse extends AbstractRequestResponse {
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
-     * Possible error code:
+     * Possible error codes:
      *
      * GROUP_LOAD_IN_PROGRESS (14)
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
@@ -37,6 +37,7 @@ public class JoinGroupResponse extends AbstractRequestResponse {
      * INCONSISTENT_GROUP_PROTOCOL (23)
      * UNKNOWN_MEMBER_ID (25)
      * INVALID_SESSION_TIMEOUT (26)
+     * GROUP_AUTHORIZATION_FAILED (30)
      */
 
     private static final String GENERATION_ID_KEY_NAME = "generation_id";

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 278e3e8..e0ca117 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -30,6 +30,7 @@ public class LeaveGroupResponse extends AbstractRequestResponse {
      * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
      * NOT_COORDINATOR_FOR_CONSUMER (16)
      * UNKNOWN_CONSUMER_ID (25)
+     * GROUP_AUTHORIZATION_FAILED (30)
      */
 
     private final short errorCode;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index f70e8da..cc1771b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -12,12 +12,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -27,6 +21,13 @@ import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class MetadataRequest extends AbstractRequest {
     
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
@@ -56,7 +57,8 @@ public class MetadataRequest extends AbstractRequest {
             topicErrors.put(topic, Errors.forException(e));
         }
 
-        Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
+        Cluster cluster = new Cluster(Collections.<Node>emptyList(), Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet());
         switch (versionId) {
             case 0:
                 return new MetadataResponse(cluster, topicErrors);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index c8f2d08..eb163dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -14,9 +14,12 @@ package org.apache.kafka.common.requests;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -159,8 +162,21 @@ public class MetadataResponse extends AbstractRequestResponse {
                 errors.put(topic, Errors.forCode(topicError));
             }
         }
+
         this.errors = errors;
-        this.cluster = new Cluster(brokers.values(), partitions);
+        this.cluster = new Cluster(brokers.values(), partitions, unauthorizedTopics(errors));
+    }
+
+    private Set<String> unauthorizedTopics(Map<String, Errors> topicErrors) {
+        if (topicErrors.isEmpty())
+            return Collections.emptySet();
+
+        Set<String> unauthorizedTopics = new HashSet<>();
+        for (Map.Entry<String, Errors> topicErrorEntry : topicErrors.entrySet()) {
+            if (topicErrorEntry.getValue() == Errors.TOPIC_AUTHORIZATION_FAILED)
+                unauthorizedTopics.add(topicErrorEntry.getKey());
+        }
+        return unauthorizedTopics;
     }
 
     public Map<String, Errors> errors() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index baea6f9..4d10a91 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -39,7 +39,7 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**
-     * Possible error code:
+     * Possible error codes:
      *
      * OFFSET_METADATA_TOO_LARGE (12)
      * GROUP_LOAD_IN_PROGRESS (14)
@@ -47,8 +47,10 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
      * NOT_COORDINATOR_FOR_GROUP (16)
      * ILLEGAL_GENERATION (22)
      * UNKNOWN_MEMBER_ID (25)
-     * COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
+     * REBALANCE_IN_PROGRESS (27)
      * INVALID_COMMIT_OFFSET_SIZE (28)
+     * TOPIC_AUTHORIZATION_FAILED (29)
+     * GROUP_AUTHORIZATION_FAILED (30)
      */
 
     private final Map<TopicPartition, Short> responseData;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index afc5618..88d68ea 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -45,13 +45,15 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
     public static final String NO_METADATA = "";
 
     /**
-     * Possible error code:
+     * Possible error codeS:
      *
      *  UNKNOWN_TOPIC_OR_PARTITION (3)  <- only for request v0
      *  GROUP_LOAD_IN_PROGRESS (14)
      *  NOT_COORDINATOR_FOR_GROUP (16)
      *  ILLEGAL_GENERATION (22)
      *  UNKNOWN_MEMBER_ID (25)
+     *  TOPIC_AUTHORIZATION_FAILED (29)
+     *  GROUP_AUTHORIZATION_FAILED (30)
      */
 
     private final Map<TopicPartition, PartitionData> responseData;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 0eb92f2..7256bd2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -36,8 +36,8 @@ public class SyncGroupResponse extends AbstractRequestResponse {
      * NOT_COORDINATOR_FOR_GROUP (16)
      * ILLEGAL_GENERATION (22)
      * UNKNOWN_MEMBER_ID (25)
-     * REBALANCE_IN_PROGRESS (30)
-     *
+     * REBALANCE_IN_PROGRESS (27)
+     * GROUP_AUTHORIZATION_FAILED (30)
      */
 
     private final short errorCode;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 55d7608..bd3bbe8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -128,7 +128,8 @@ public class MetadataTest {
                 Collections.singletonList(new Node(0, "host1", 1000)),
                 Arrays.asList(
                     new PartitionInfo("topic", 0, null, null, null),
-                    new PartitionInfo("topic1", 0, null, null, null))),
+                    new PartitionInfo("topic1", 0, null, null, null)),
+                Collections.<String>emptySet()),
             100);
 
         assertArrayEquals("Metadata got updated with wrong set of topics.",
@@ -154,7 +155,8 @@ public class MetadataTest {
                 Arrays.asList(new Node(0, "host1", 1000)),
                 Arrays.asList(
                     new PartitionInfo("topic", 0, null, null, null),
-                    new PartitionInfo("topic1", 0, null, null, null))),
+                    new PartitionInfo("topic1", 0, null, null, null)),
+                Collections.<String>emptySet()),
             100);
 
         assertEquals("Listener did not update topics list correctly",
@@ -179,7 +181,8 @@ public class MetadataTest {
                 Collections.singletonList(new Node(0, "host1", 1000)),
                 Arrays.asList(
                     new PartitionInfo("topic", 0, null, null, null),
-                    new PartitionInfo("topic1", 0, null, null, null))),
+                    new PartitionInfo("topic1", 0, null, null, null)),
+                Collections.<String>emptySet()),
             100);
 
         metadata.removeListener(listener);
@@ -188,7 +191,8 @@ public class MetadataTest {
                 Arrays.asList(new Node(0, "host1", 1000)),
                 Arrays.asList(
                     new PartitionInfo("topic2", 0, null, null, null),
-                    new PartitionInfo("topic3", 0, null, null, null))),
+                    new PartitionInfo("topic3", 0, null, null, null)),
+                Collections.<String>emptySet()),
             100);
 
         assertEquals("Listener did not update topics list correctly",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8667f22..7fd6d88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -114,7 +115,6 @@ public class ConsumerCoordinatorTest {
                 "consumer" + groupId,
                 metricTags,
                 time,
-                requestTimeoutMs,
                 retryBackoffMs,
                 defaultOffsetCommitCallback,
                 autoCommitEnabled,
@@ -144,6 +144,24 @@ public class ConsumerCoordinatorTest {
         assertTrue(future.succeeded());
     }
 
+    @Test(expected = GroupAuthorizationException.class)
+    public void testGroupDescribeUnauthorized() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code()));
+        coordinator.ensureCoordinatorKnown();
+    }
+
+    @Test(expected = GroupAuthorizationException.class)
+    public void testGroupReadUnauthorized() {
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
+                Errors.GROUP_AUTHORIZATION_FAILED.code()));
+        coordinator.ensurePartitionAssignment();
+    }
+
     @Test
     public void testCoordinatorNotAvailable() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1e7a215..8711830 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -147,6 +148,23 @@ public class FetcherTest {
     }
 
     @Test
+    public void testUnauthorizedTopic() {
+        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.seek(tp, 0);
+
+        // resize the limit of the buffer to pretend it is only fetch-size large
+        fetcher.initFetches(cluster);
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
+        consumerClient.poll(0);
+        try {
+            fetcher.fetchedRecords();
+            fail("fetchedRecords should have thrown");
+        } catch (TopicAuthorizationException e) {
+            assertEquals(Collections.singleton(topicName), e.unauthorizedTopics());
+        }
+    }
+
+    @Test
     public void testFetchDuringRebalance() {
         subscriptions.subscribe(Arrays.asList(topicName), listener);
         subscriptions.assignFromSubscribed(Arrays.asList(tp));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 7a46c56..0a0bdd8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -58,7 +59,7 @@ public class MockProducerTest {
     public void testPartitioner() throws Exception {
         PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
         PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
-        Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1));
+        Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet());
         MockProducer<String, String> producer = new MockProducer<String, String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
         ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "key", "value");
         Future<RecordMetadata> metadata = producer.send(record);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
index 977fa93..7a5cef6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -16,6 +16,7 @@ import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.kafka.clients.producer.Partitioner;
@@ -36,7 +37,7 @@ public class DefaultPartitionerTest {
     private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
                                                     new PartitionInfo(topic, 2, node1, nodes, nodes),
                                                     new PartitionInfo(topic, 0, node0, nodes, nodes));
-    private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
+    private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions, Collections.<String>emptySet());
 
     @Test
     public void testKeyPartitionIsStable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 132b83b..4674a91 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -63,7 +63,7 @@ public class RecordAccumulatorTest {
     private byte[] key = "key".getBytes();
     private byte[] value = "value".getBytes();
     private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
-    private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3));
+    private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.<String>emptySet());
     private Metrics metrics = new Metrics(time);
     Map<String, String> metricTags = new LinkedHashMap<String, String>();
     private final long maxBlockTimeMs = 1000;
@@ -314,7 +314,7 @@ public class RecordAccumulatorTest {
         accum.append(tp1, key, value, null, 0);
         Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
-        Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
+        Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>(), Collections.<String>emptySet());
         now = time.milliseconds();
         List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, cluster, now);
         assertEquals(1, expiredBatches.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 761b9db..5ee11d2 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -254,7 +254,8 @@ public class RequestResponseTest {
         replicas[0] = node;
         Node[] isr = new Node[1];
         isr[0] = node;
-        Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)));
+        Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)),
+                Collections.<String>emptySet());
 
         Map<String, Errors> errors = new HashMap<String, Errors>();
         errors.put("topic2", Errors.LEADER_NOT_AVAILABLE);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index ccf3a5f..0fb59e6 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -20,6 +20,7 @@ import static java.util.Arrays.asList;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
@@ -54,7 +55,7 @@ public class TestUtils {
         List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
         for (int i = 0; i < partitions; i++)
             parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
-        return new Cluster(asList(ns), parts);
+        return new Cluster(asList(ns), parts, Collections.<String>emptySet());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
index 9fdbac7..d11165c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
@@ -81,7 +81,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                 metricGrpPrefix,
                 metricTags,
                 time,
-                requestTimeoutMs,
                 retryBackoffMs);
         this.restUrl = restUrl;
         this.configStorage = configStorage;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/core/src/main/scala/kafka/common/AuthorizationException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/AuthorizationException.scala b/core/src/main/scala/kafka/common/AuthorizationException.scala
index 12ee0fe..55919a5 100644
--- a/core/src/main/scala/kafka/common/AuthorizationException.scala
+++ b/core/src/main/scala/kafka/common/AuthorizationException.scala
@@ -20,6 +20,17 @@ package kafka.common
  * Exception thrown when a principal is not authorized to perform an operation.
  * @param message
  */
-class AuthorizationException(message: String) extends RuntimeException(message) {
+abstract class AuthorizationException(message: String) extends RuntimeException(message) {
+}
+
+class TopicAuthorizationException(message: String) extends AuthorizationException(message) {
+  def this() = this(null)
+}
+
+class GroupAuthorizationException(message: String) extends AuthorizationException(message) {
+  def this() = this(null)
+}
+
+class ClusterAuthorizationException(message: String) extends AuthorizationException(message) {
   def this() = this(null)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 6f53fac..e0ebe94 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -57,10 +57,11 @@ object ErrorMapping {
   // 24: UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY
   // 25: UNKNOWN_CONSUMER_ID
   // 26: INVALID_SESSION_TIMEOUT
-  // 27: COMMITTING_PARTITIONS_NOT_ASSIGNED
+  // 27: REBALANCE_IN_PROGRESS
   // 28: INVALID_COMMIT_OFFSET_SIZE
-  val AuthorizationCode: Short = 29
-  // 30: REBALANCE_IN_PROGRESS
+  val TopicAuthorizationCode: Short = 29
+  val GroupAuthorizationCode: Short = 30
+  val ClusterAuthorizationCode: Short = 31
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](
@@ -83,7 +84,9 @@ object ErrorMapping {
       classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
       classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
       classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode,
-      classOf[AuthorizationException].asInstanceOf[Class[Throwable]] -> AuthorizationCode
+      classOf[TopicAuthorizationException].asInstanceOf[Class[Throwable]] -> TopicAuthorizationCode,
+      classOf[GroupAuthorizationException].asInstanceOf[Class[Throwable]] -> GroupAuthorizationCode,
+      classOf[ClusterAuthorizationException].asInstanceOf[Class[Throwable]] -> ClusterAuthorizationCode
     ).withDefaultValue(UnknownCode)
 
   /* invert the mapping */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index ceb6348..bb39722 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -16,25 +16,28 @@
  */
 package kafka.security.auth
 
-import kafka.common.{BaseEnum, KafkaException}
+import kafka.common.{ErrorMapping, BaseEnum, KafkaException}
 
 /**
  * ResourceTypes.
  */
 
 
-sealed trait ResourceType extends BaseEnum
+sealed trait ResourceType extends BaseEnum { def errorCode: Short }
 
 case object Cluster extends ResourceType {
   val name = "Cluster"
+  val errorCode = ErrorMapping.ClusterAuthorizationCode
 }
 
 case object Topic extends ResourceType {
   val name = "Topic"
+  val errorCode = ErrorMapping.TopicAuthorizationCode
 }
 
 case object Group extends ResourceType {
   val name = "Group"
+  val errorCode = ErrorMapping.GroupAuthorizationCode
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0a2e0b9..21434f7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -183,6 +183,14 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleOffsetCommitRequest(request: RequestChannel.Request) {
     val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
 
+    // reject the request immediately if not authorized to the group
+    if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
+      val errors = offsetCommitRequest.requestInfo.mapValues(_ => ErrorMapping.GroupAuthorizationCode)
+      val response = OffsetCommitResponse(errors, offsetCommitRequest.correlationId)
+      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+      return
+    }
+
     // filter non-exist topics
     val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) =>
       !metadataCache.contains(topicAndPartition.topic)
@@ -191,13 +199,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val (authorizedRequestInfo, unauthorizedRequestInfo) =  filteredRequestInfo.partition {
       case (topicAndPartition, offsetMetadata) =>
-        authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) &&
-          authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))
+        authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
     }
 
     // the callback for sending an offset commit response
     def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
-      val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.AuthorizationCode)
+      val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.TopicAuthorizationCode)
 
       mergedCommitStatus.foreach { case (topicAndPartition, errorCode) =>
         // we only print warnings for known errors here; only replica manager could see an unknown
@@ -298,7 +305,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
       var errorInResponse = false
-      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.AuthorizationCode, -1))
+      val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
 
       mergedResponseStatus.foreach { case (topicAndPartition, status) =>
         // we only print warnings for known errors here; if it is unknown, it will cause
@@ -379,7 +386,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
     }
 
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.AuthorizationCode, -1, MessageSet.Empty))
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.TopicAuthorizationCode, -1, MessageSet.Empty))
 
     // the callback for sending a fetch response
     def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
@@ -442,7 +449,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
     }
 
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.AuthorizationCode, Nil))
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.TopicAuthorizationCode, Nil))
 
     val responseMap = authorizedRequestInfo.map(elem => {
       val (topicAndPartition, partitionOffsetRequestInfo) = elem
@@ -614,7 +621,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.AuthorizationCode))
+    val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.TopicAuthorizationCode))
 
     val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
     val brokers = metadataCache.getAliveBrokers
@@ -630,12 +637,19 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
     val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
 
+    // reject the request immediately if not authorized to the group
+    if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
+      val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.GroupAuthorizationCode)
+      val response = OffsetFetchResponse(offsetFetchRequest.requestInfo.map{ _ -> authorizationError}.toMap)
+      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+      return
+    }
+
     val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
-      authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) &&
-        authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))
+      authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
     }
 
-    val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.AuthorizationCode)
+    val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.TopicAuthorizationCode)
     val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap
 
     val response = if (offsetFetchRequest.versionId == 0) {
@@ -659,9 +673,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       })
 
-      val unauthorizedTopics = unauthorizedTopicPartitions.map( topicAndPartition =>
-        (topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,ErrorMapping.AuthorizationCode)))
-      OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedTopics, offsetFetchRequest.correlationId)
+      OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedStatus, offsetFetchRequest.correlationId)
     } else {
       // version 1 reads offsets from Kafka;
       val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
@@ -683,7 +695,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val responseHeader = new ResponseHeader(request.header.correlationId)
 
     if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
-      val responseBody = new GroupCoordinatorResponse(Errors.AUTHORIZATION_FAILED.code, Node.noNode)
+      val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
       requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
     } else {
       val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
@@ -716,7 +728,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groups = describeRequest.groupIds().asScala.map {
       case groupId =>
         if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
-          groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.AUTHORIZATION_FAILED)
+          groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
         } else {
           val (error, summary) = coordinator.handleDescribeGroup(groupId)
           val members = summary.members.map { member =>
@@ -738,7 +750,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val responseHeader = new ResponseHeader(request.header.correlationId)
     val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
-      ListGroupsResponse.fromError(Errors.AUTHORIZATION_FAILED)
+      ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
     } else {
       val (error, groups) = coordinator.handleListGroups()
       val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
@@ -766,7 +778,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
       val responseBody = new JoinGroupResponse(
-        ErrorMapping.AuthorizationCode,
+        ErrorMapping.GroupAuthorizationCode,
         JoinGroupResponse.UNKNOWN_GENERATION_ID,
         JoinGroupResponse.UNKNOWN_PROTOCOL,
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
@@ -801,7 +813,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
-      sendResponseCallback(Array[Byte](), ErrorMapping.AuthorizationCode)
+      sendResponseCallback(Array[Byte](), ErrorMapping.GroupAuthorizationCode)
     } else {
       coordinator.handleSyncGroup(
         syncGroupRequest.groupId(),
@@ -826,7 +838,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
-      val heartbeatResponse = new HeartbeatResponse(ErrorMapping.AuthorizationCode)
+      val heartbeatResponse = new HeartbeatResponse(ErrorMapping.GroupAuthorizationCode)
       requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
     }
     else {
@@ -877,7 +889,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
-      val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.AuthorizationCode)
+      val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.GroupAuthorizationCode)
       requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse)))
     } else {
       // let the coordinator to handle leave-group
@@ -897,7 +909,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
-      throw new AuthorizationException(s"Request $request is not authorized.")
+      throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
 
 }