You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/04 20:02:42 UTC
[2/2] kafka git commit: KAFKA-2691: Improve handling of authorization
failure during metadata refresh
KAFKA-2691: Improve handling of authorization failure during metadata refresh
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Jun Rao
Closes #394 from hachikuji/KAFKA-2691
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c39e79bb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c39e79bb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c39e79bb
Branch: refs/heads/trunk
Commit: c39e79bb5af5b4e56bec358f8ec3758e6822dbcf
Parents: c30ee50
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Nov 4 11:02:30 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Nov 4 11:02:30 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/Metadata.java | 6 +-
.../kafka/clients/consumer/KafkaConsumer.java | 28 +-
.../consumer/internals/AbstractCoordinator.java | 39 +-
.../consumer/internals/ConsumerCoordinator.java | 29 +-
.../clients/consumer/internals/Fetcher.java | 29 +-
.../kafka/clients/producer/KafkaProducer.java | 30 +-
.../clients/producer/internals/Sender.java | 3 +
.../java/org/apache/kafka/common/Cluster.java | 32 +-
.../common/errors/AuthorizationException.java | 2 +
.../errors/GroupAuthorizationException.java | 27 +
.../errors/TopicAuthorizationException.java | 33 ++
.../apache/kafka/common/protocol/Errors.java | 12 +-
.../requests/GroupCoordinatorResponse.java | 9 +
.../common/requests/HeartbeatResponse.java | 4 +-
.../common/requests/JoinGroupResponse.java | 3 +-
.../common/requests/LeaveGroupResponse.java | 1 +
.../kafka/common/requests/MetadataRequest.java | 16 +-
.../kafka/common/requests/MetadataResponse.java | 18 +-
.../common/requests/OffsetCommitResponse.java | 6 +-
.../common/requests/OffsetFetchResponse.java | 4 +-
.../common/requests/SyncGroupResponse.java | 4 +-
.../org/apache/kafka/clients/MetadataTest.java | 12 +-
.../internals/ConsumerCoordinatorTest.java | 20 +-
.../clients/consumer/internals/FetcherTest.java | 18 +
.../clients/producer/MockProducerTest.java | 3 +-
.../internals/DefaultPartitionerTest.java | 3 +-
.../internals/RecordAccumulatorTest.java | 4 +-
.../common/requests/RequestResponseTest.java | 3 +-
.../java/org/apache/kafka/test/TestUtils.java | 3 +-
.../runtime/distributed/WorkerCoordinator.java | 1 -
.../kafka/common/AuthorizationException.scala | 13 +-
.../main/scala/kafka/common/ErrorMapping.scala | 11 +-
.../kafka/security/auth/ResourceType.scala | 7 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 54 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 508 +++++++++++++------
.../processor/DefaultPartitionGrouperTest.java | 3 +-
.../processor/internals/StreamThreadTest.java | 2 +-
37 files changed, 716 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index f2fca12..7e77c18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -238,14 +238,18 @@ public final class Metadata {
}
private Cluster getClusterForCurrentTopics(Cluster cluster) {
+ Set<String> unauthorizedTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
if (cluster != null) {
+ unauthorizedTopics.addAll(cluster.unauthorizedTopics());
+ unauthorizedTopics.retainAll(this.topics);
+
for (String topic : this.topics) {
partitionInfos.addAll(cluster.partitionsForTopic(topic));
}
nodes = cluster.nodes();
}
- return new Cluster(nodes, partitionInfos);
+ return new Cluster(nodes, partitionInfos, unauthorizedTopics);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index cce13dd..a6be519 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -544,7 +544,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
metricGrpPrefix,
metricsTags,
this.time,
- requestTimeoutMs,
retryBackoffMs,
new ConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
@@ -777,10 +776,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws NoOffsetForPartitionException if there is no stored offset for a subscribed partition and no automatic
* offset reset policy has been configured.
* @throws org.apache.kafka.common.errors.OffsetOutOfRangeException if there is OffsetOutOfRange error in fetchResponse and
- * the defaultResetPolicy is NONE
- * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
- *
- * @throws org.apache.kafka.common.errors.AuthorizationException if caller does not have Read permission on topic.
+ * the defaultResetPolicy is NONE
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.AuthorizationException if caller does Read access to any of the subscribed
+ * topics or to the configured groupId
*/
@Override
public ConsumerRecords<K, V> poll(long timeout) {
@@ -883,7 +883,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* encountered (in which case it is thrown to the caller).
*
* @param offsets A map of offsets by partition with associated metadata
- * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+ * configured groupId
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
@@ -1008,7 +1011,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The offset
* @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
* available.
- * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+ *
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+ * configured groupId
*/
public long position(TopicPartition partition) {
acquire();
@@ -1035,7 +1042,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param partition The partition to check
* @return The last committed offset and metadata or null if there was no prior commit
- * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+ * configured groupId
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
@@ -1160,7 +1170,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
- * The thread which is blocking in an operation will throw {@link WakeupException}.
+ * The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}.
*/
@Override
public void wakeup() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 9cf825c..781ff78 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -17,6 +17,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -88,7 +89,6 @@ public abstract class AbstractCoordinator {
protected final ConsumerNetworkClient client;
protected final Time time;
protected final long retryBackoffMs;
- protected final long requestTimeoutMs;
private boolean needsJoinPrepare = true;
private boolean rejoinNeeded = true;
@@ -108,7 +108,6 @@ public abstract class AbstractCoordinator {
String metricGrpPrefix,
Map<String, String> metricTags,
Time time,
- long requestTimeoutMs,
long retryBackoffMs) {
this.client = client;
this.time = time;
@@ -120,7 +119,6 @@ public abstract class AbstractCoordinator {
this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
this.heartbeatTask = new HeartbeatTask();
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
- this.requestTimeoutMs = requestTimeoutMs;
this.retryBackoffMs = retryBackoffMs;
}
@@ -178,7 +176,7 @@ public abstract class AbstractCoordinator {
public void ensureCoordinatorKnown() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = sendGroupMetadataRequest();
- client.poll(future, requestTimeoutMs);
+ client.poll(future);
if (future.failed()) {
if (future.isRetriable())
@@ -376,6 +374,8 @@ public abstract class AbstractCoordinator {
log.error("Attempt to join group {} failed due to: {}",
groupId, error.exception().getMessage());
future.raise(error);
+ } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+ future.raise(new GroupAuthorizationException(groupId));
} else {
// unexpected error, throw the exception
future.raise(new KafkaException("Unexpected error in join group response: "
@@ -427,6 +427,8 @@ public abstract class AbstractCoordinator {
if (errorCode == Errors.NONE.code()) {
future.complete(syncResponse.memberAssignment());
sensors.syncLatency.record(response.requestLatencyMs());
+ } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+ future.raise(new GroupAuthorizationException(groupId));
} else {
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.forCode(errorCode));
@@ -476,7 +478,8 @@ public abstract class AbstractCoordinator {
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
- if (groupCoordinatorResponse.errorCode() == Errors.NONE.code()) {
+ short errorCode = groupCoordinatorResponse.errorCode();
+ if (errorCode == Errors.NONE.code()) {
this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());
@@ -487,8 +490,10 @@ public abstract class AbstractCoordinator {
if (generation > 0)
heartbeatTask.reset();
future.complete(null);
+ } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+ future.raise(new GroupAuthorizationException(groupId));
} else {
- future.raise(Errors.forCode(groupCoordinatorResponse.errorCode()));
+ future.raise(Errors.forCode(errorCode));
}
}
}
@@ -538,31 +543,33 @@ public abstract class AbstractCoordinator {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatLatency.record(response.requestLatencyMs());
- short error = heartbeatResponse.errorCode();
- if (error == Errors.NONE.code()) {
+ short errorCode = heartbeatResponse.errorCode();
+ if (errorCode == Errors.NONE.code()) {
log.debug("Received successful heartbeat response.");
future.complete(null);
- } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
- || error == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+ } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
+ || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
coordinatorDead();
- future.raise(Errors.forCode(error));
- } else if (error == Errors.REBALANCE_IN_PROGRESS.code()) {
+ future.raise(Errors.forCode(errorCode));
+ } else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.REBALANCE_IN_PROGRESS);
- } else if (error == Errors.ILLEGAL_GENERATION.code()) {
+ } else if (errorCode == Errors.ILLEGAL_GENERATION.code()) {
log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.ILLEGAL_GENERATION);
- } else if (error == Errors.UNKNOWN_MEMBER_ID.code()) {
+ } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group.");
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.UNKNOWN_MEMBER_ID);
+ } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+ future.raise(new GroupAuthorizationException(groupId));
} else {
- future.raise(new KafkaException("Unexpected error in heartbeat response: "
- + Errors.forCode(error).exception().getMessage()));
+ future.raise(new KafkaException("Unexpected errorCode in heartbeat response: "
+ + Errors.forCode(errorCode).exception().getMessage()));
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5b5d6ff..c7323cb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -15,6 +15,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@@ -82,7 +84,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
String metricGrpPrefix,
Map<String, String> metricTags,
Time time,
- long requestTimeoutMs,
long retryBackoffMs,
OffsetCommitCallback defaultOffsetCommitCallback,
boolean autoCommitEnabled,
@@ -95,7 +96,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
metricGrpPrefix,
metricTags,
time,
- requestTimeoutMs,
retryBackoffMs);
this.metadata = metadata;
@@ -136,6 +136,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
this.metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
+ // if we encounter any unauthorized topics, raise an exception to the user
+ if (!cluster.unauthorizedTopics().isEmpty())
+ throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
+
if (subscriptions.hasPatternSubscription()) {
final List<String> topicsToSubscribe = new ArrayList<>();
@@ -340,13 +344,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
client.poll(future);
- if (future.succeeded()) {
+ if (future.succeeded())
return;
- }
- if (!future.isRetriable()) {
+ if (!future.isRetriable())
throw future.exception();
- }
Utils.sleep(retryBackoffMs);
}
@@ -439,6 +441,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
@Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitLatency.record(response.requestLatencyMs());
+ Set<String> unauthorizedTopics = new HashSet<>();
+
for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
@@ -450,6 +454,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
if (subscriptions.isAssigned(tp))
// update the local cache only if the partition is still assigned
subscriptions.committed(tp, offsetAndMetadata);
+ } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+ future.raise(new GroupAuthorizationException(groupId));
+ return;
+ } else if (errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
+ unauthorizedTopics.add(tp.topic());
} else {
if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
// just retry
@@ -458,7 +467,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
|| errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
coordinatorDead();
} else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()
- || errorCode == Errors.ILLEGAL_GENERATION.code()) {
+ || errorCode == Errors.ILLEGAL_GENERATION.code()
+ || errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
// need to re-join group
subscriptions.needReassignment();
}
@@ -473,7 +483,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
}
}
- future.complete(null);
+ if (!unauthorizedTopics.isEmpty())
+ future.raise(new TopicAuthorizationException(unauthorizedTopics));
+ else
+ future.complete(null);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4e0d5ec..7c5bca6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -78,7 +78,7 @@ public class Fetcher<K, V> {
private final Deserializer<V> valueDeserializer;
private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
- private final Set<TopicPartition> unauthorizedTopicPartitions;
+ private final Set<String> unauthorizedTopics;
private final Map<TopicPartition, Long> recordTooLargePartitions;
public Fetcher(ConsumerNetworkClient client,
@@ -110,7 +110,7 @@ public class Fetcher<K, V> {
this.records = new LinkedList<PartitionRecords<K, V>>();
this.offsetOutOfRangePartitions = new HashMap<>();
- this.unauthorizedTopicPartitions = new HashSet<>();
+ this.unauthorizedTopics = new HashSet<>();
this.recordTooLargePartitions = new HashMap<>();
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
@@ -302,19 +302,18 @@ public class Fetcher<K, V> {
}
/**
- * If any topic from previous fetchResponse contatains Authorization error, throw ApiException.
- * @throws ApiException
+ * If any topic from previous fetchResponse contains an Authorization error, raise an exception
+ * @throws TopicAuthorizationException
*/
- private void throwIfUnauthorized() throws ApiException {
- if (!unauthorizedTopicPartitions.isEmpty()) {
- StringBuilder sb = new StringBuilder();
- for (TopicPartition topicPartition : unauthorizedTopicPartitions)
- sb.append(topicPartition + ",");
- unauthorizedTopicPartitions.clear();
- throw new AuthorizationException(String.format("Not authorized to read from %s", sb.substring(0, sb.length() - 1).toString()));
+ private void throwIfUnauthorizedTopics() throws TopicAuthorizationException {
+ if (!unauthorizedTopics.isEmpty()) {
+ Set<String> topics = new HashSet<>(unauthorizedTopics);
+ unauthorizedTopics.clear();
+ throw new TopicAuthorizationException(topics);
}
}
- /**
+
+ /**
* If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException
*
* @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
@@ -346,7 +345,7 @@ public class Fetcher<K, V> {
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
throwIfOffsetOutOfRange();
- throwIfUnauthorized();
+ throwIfUnauthorizedTopics();
throwIfRecordTooLarge();
for (PartitionRecords<K, V> part : this.records) {
@@ -557,9 +556,9 @@ public class Fetcher<K, V> {
else
this.offsetOutOfRangePartitions.put(tp, fetchOffset);
log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
- } else if (partition.errorCode == Errors.AUTHORIZATION_FAILED.code()) {
+ } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
log.warn("Not authorized to read from topic {}.", tp.topic());
- unauthorizedTopicPartitions.add(tp);
+ unauthorizedTopics.add(tp.topic());
} else if (partition.errorCode == Errors.UNKNOWN.code()) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ff3bfe6..21d635b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
@@ -473,21 +474,22 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
if (!this.metadata.containsTopic(topic))
this.metadata.add(topic);
- if (metadata.fetch().partitionsForTopic(topic) != null) {
+ if (metadata.fetch().partitionsForTopic(topic) != null)
return;
- } else {
- long begin = time.milliseconds();
- long remainingWaitMs = maxWaitMs;
- while (metadata.fetch().partitionsForTopic(topic) == null) {
- log.trace("Requesting metadata update for topic {}.", topic);
- int version = metadata.requestUpdate();
- sender.wakeup();
- metadata.awaitUpdate(version, remainingWaitMs);
- long elapsed = time.milliseconds() - begin;
- if (elapsed >= maxWaitMs)
- throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
- remainingWaitMs = maxWaitMs - elapsed;
- }
+
+ long begin = time.milliseconds();
+ long remainingWaitMs = maxWaitMs;
+ while (metadata.fetch().partitionsForTopic(topic) == null) {
+ log.trace("Requesting metadata update for topic {}.", topic);
+ int version = metadata.requestUpdate();
+ sender.wakeup();
+ metadata.awaitUpdate(version, remainingWaitMs);
+ long elapsed = time.milliseconds() - begin;
+ if (elapsed >= maxWaitMs)
+ throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+ if (metadata.fetch().unauthorizedTopics().contains(topic))
+ throw new TopicAuthorizationException(topic);
+ remainingWaitMs = maxWaitMs - elapsed;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 56be021..cada626 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
@@ -288,6 +289,8 @@ public class Sender implements Runnable {
error);
this.accumulator.reenqueue(batch, now);
this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
+ } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
+ batch.done(baseOffset, new TopicAuthorizationException(batch.topicPartition.topic()));
} else {
// tell the user the result of their request
batch.done(baseOffset, error.exception());
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index e6a2e43..c7f3e07 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -23,6 +23,7 @@ import java.util.*;
public final class Cluster {
private final List<Node> nodes;
+ private final Set<String> unauthorizedTopics;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
@@ -34,26 +35,28 @@ public final class Cluster {
* @param nodes The nodes in the cluster
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/
- public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
+ public Cluster(Collection<Node> nodes,
+ Collection<PartitionInfo> partitions,
+ Set<String> unauthorizedTopics) {
// make a randomized, unmodifiable copy of the nodes
- List<Node> copy = new ArrayList<Node>(nodes);
+ List<Node> copy = new ArrayList<>(nodes);
Collections.shuffle(copy);
this.nodes = Collections.unmodifiableList(copy);
- this.nodesById = new HashMap<Integer, Node>();
+ this.nodesById = new HashMap<>();
for (Node node: nodes)
this.nodesById.put(node.id(), node);
// index the partitions by topic/partition for quick lookup
- this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
+ this.partitionsByTopicPartition = new HashMap<>(partitions.size());
for (PartitionInfo p : partitions)
this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
// index the partitions by topic and node respectively, and make the lists
// unmodifiable so we can hand them out in user-facing apis without risk
// of the client modifying the contents
- HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<String, List<PartitionInfo>>();
- HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<Integer, List<PartitionInfo>>();
+ HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<>();
+ HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<>();
for (Node n : this.nodes) {
partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
}
@@ -68,30 +71,31 @@ public final class Cluster {
psNode.add(p);
}
}
- this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
- this.availablePartitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
+ this.partitionsByTopic = new HashMap<>(partsForTopic.size());
+ this.availablePartitionsByTopic = new HashMap<>(partsForTopic.size());
for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet()) {
String topic = entry.getKey();
List<PartitionInfo> partitionList = entry.getValue();
this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList));
- List<PartitionInfo> availablePartitions = new ArrayList<PartitionInfo>();
+ List<PartitionInfo> availablePartitions = new ArrayList<>();
for (PartitionInfo part : partitionList) {
if (part.leader() != null)
availablePartitions.add(part);
}
this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
}
- this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
+ this.partitionsByNode = new HashMap<>(partsForNode.size());
for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+ this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
}
/**
* Create an empty cluster instance with no nodes and no topic-partitions.
*/
public static Cluster empty() {
- return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0));
+ return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
}
/**
@@ -104,7 +108,7 @@ public final class Cluster {
int nodeId = -1;
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostName(), address.getPort()));
- return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
+ return new Cluster(nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
}
/**
@@ -190,6 +194,10 @@ public final class Cluster {
return this.partitionsByTopic.keySet();
}
+ public Set<String> unauthorizedTopics() {
+ return unauthorizedTopics;
+ }
+
@Override
public String toString() {
return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
index 2a01e5e..7fc932d 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
@@ -13,7 +13,9 @@
package org.apache.kafka.common.errors;
public class AuthorizationException extends ApiException {
+
public AuthorizationException(String message) {
super(message);
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java
new file mode 100644
index 0000000..3a767aa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class GroupAuthorizationException extends AuthorizationException {
+ private final String groupId;
+
+ public GroupAuthorizationException(String groupId) {
+ super("Not authorized to access group: " + groupId);
+ this.groupId = groupId;
+ }
+
+ public String groupId() {
+ return groupId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
new file mode 100644
index 0000000..b5d33b9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class TopicAuthorizationException extends AuthorizationException {
+ private final Set<String> unauthorizedTopics;
+
+ public TopicAuthorizationException(Set<String> unauthorizedTopics) {
+ super("Not authorized to access topics: " + unauthorizedTopics);
+ this.unauthorizedTopics = unauthorizedTopics;
+ }
+
+ public TopicAuthorizationException(String unauthorizedTopic) {
+ this(Collections.singleton(unauthorizedTopic));
+ }
+
+ public Set<String> unauthorizedTopics() {
+ return unauthorizedTopics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index d4eb1f9..516e50b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -84,12 +84,16 @@ public enum Errors {
new UnknownMemberIdException("The coordinator is not aware of this member.")),
INVALID_SESSION_TIMEOUT(26,
new ApiException("The session timeout is not within an acceptable range.")),
+ REBALANCE_IN_PROGRESS(27,
+ new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")),
INVALID_COMMIT_OFFSET_SIZE(28,
new ApiException("The committing offset data size is not valid")),
- AUTHORIZATION_FAILED(29,
- new ApiException("Request is not authorized.")),
- REBALANCE_IN_PROGRESS(30,
- new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed."));
+ TOPIC_AUTHORIZATION_FAILED(29,
+ new AuthorizationException("Topic authorization failed.")),
+ GROUP_AUTHORIZATION_FAILED(30,
+ new AuthorizationException("Group authorization failed.")),
+ CLUSTER_AUTHORIZATION_FAILED(31,
+ new AuthorizationException("Cluster authorization failed."));
private static final Logger log = LoggerFactory.getLogger(Errors.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
index c28de70..3fe014d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
@@ -26,6 +26,15 @@ public class GroupCoordinatorResponse extends AbstractRequestResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
private static final String COORDINATOR_KEY_NAME = "coordinator";
+ /**
+ * Possible error codes:
+ *
+ * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+ * NOT_COORDINATOR_FOR_GROUP (16)
+ * GROUP_AUTHORIZATION_FAILED (30)
+ */
+
+
// coordinator level field names
private static final String NODE_ID_KEY_NAME = "node_id";
private static final String HOST_KEY_NAME = "host";
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 48cb4c0..a595efb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -25,12 +25,14 @@ public class HeartbeatResponse extends AbstractRequestResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
- * Possible error code:
+ * Possible error codes:
*
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
+ * REBALANCE_IN_PROGRESS (27)
+ * GROUP_AUTHORIZATION_FAILED (30)
*/
private final short errorCode;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 0615e5e..c869b1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -29,7 +29,7 @@ public class JoinGroupResponse extends AbstractRequestResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
- * Possible error code:
+ * Possible error codes:
*
* GROUP_LOAD_IN_PROGRESS (14)
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
@@ -37,6 +37,7 @@ public class JoinGroupResponse extends AbstractRequestResponse {
* INCONSISTENT_GROUP_PROTOCOL (23)
* UNKNOWN_MEMBER_ID (25)
* INVALID_SESSION_TIMEOUT (26)
+ * GROUP_AUTHORIZATION_FAILED (30)
*/
private static final String GENERATION_ID_KEY_NAME = "generation_id";
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 278e3e8..e0ca117 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -30,6 +30,7 @@ public class LeaveGroupResponse extends AbstractRequestResponse {
* CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_CONSUMER (16)
* UNKNOWN_CONSUMER_ID (25)
+ * GROUP_AUTHORIZATION_FAILED (30)
*/
private final short errorCode;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index f70e8da..cc1771b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -12,12 +12,6 @@
*/
package org.apache.kafka.common.requests;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -27,6 +21,13 @@ import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class MetadataRequest extends AbstractRequest {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
@@ -56,7 +57,8 @@ public class MetadataRequest extends AbstractRequest {
topicErrors.put(topic, Errors.forException(e));
}
- Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
+ Cluster cluster = new Cluster(Collections.<Node>emptyList(), Collections.<PartitionInfo>emptyList(),
+ Collections.<String>emptySet());
switch (versionId) {
case 0:
return new MetadataResponse(cluster, topicErrors);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index c8f2d08..eb163dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -14,9 +14,12 @@ package org.apache.kafka.common.requests;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
@@ -159,8 +162,21 @@ public class MetadataResponse extends AbstractRequestResponse {
errors.put(topic, Errors.forCode(topicError));
}
}
+
this.errors = errors;
- this.cluster = new Cluster(brokers.values(), partitions);
+ this.cluster = new Cluster(brokers.values(), partitions, unauthorizedTopics(errors));
+ }
+
+ private Set<String> unauthorizedTopics(Map<String, Errors> topicErrors) {
+ if (topicErrors.isEmpty())
+ return Collections.emptySet();
+
+ Set<String> unauthorizedTopics = new HashSet<>();
+ for (Map.Entry<String, Errors> topicErrorEntry : topicErrors.entrySet()) {
+ if (topicErrorEntry.getValue() == Errors.TOPIC_AUTHORIZATION_FAILED)
+ unauthorizedTopics.add(topicErrorEntry.getKey());
+ }
+ return unauthorizedTopics;
}
public Map<String, Errors> errors() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index baea6f9..4d10a91 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -39,7 +39,7 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
- * Possible error code:
+ * Possible error codes:
*
* OFFSET_METADATA_TOO_LARGE (12)
* GROUP_LOAD_IN_PROGRESS (14)
@@ -47,8 +47,10 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
- * COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
+ * REBALANCE_IN_PROGRESS (27)
* INVALID_COMMIT_OFFSET_SIZE (28)
+ * TOPIC_AUTHORIZATION_FAILED (29)
+ * GROUP_AUTHORIZATION_FAILED (30)
*/
private final Map<TopicPartition, Short> responseData;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index afc5618..88d68ea 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -45,13 +45,15 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
public static final String NO_METADATA = "";
/**
- * Possible error code:
+ * Possible error codeS:
*
* UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0
* GROUP_LOAD_IN_PROGRESS (14)
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
+ * TOPIC_AUTHORIZATION_FAILED (29)
+ * GROUP_AUTHORIZATION_FAILED (30)
*/
private final Map<TopicPartition, PartitionData> responseData;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 0eb92f2..7256bd2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -36,8 +36,8 @@ public class SyncGroupResponse extends AbstractRequestResponse {
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
- * REBALANCE_IN_PROGRESS (30)
- *
+ * REBALANCE_IN_PROGRESS (27)
+ * GROUP_AUTHORIZATION_FAILED (30)
*/
private final short errorCode;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 55d7608..bd3bbe8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -128,7 +128,8 @@ public class MetadataTest {
Collections.singletonList(new Node(0, "host1", 1000)),
Arrays.asList(
new PartitionInfo("topic", 0, null, null, null),
- new PartitionInfo("topic1", 0, null, null, null))),
+ new PartitionInfo("topic1", 0, null, null, null)),
+ Collections.<String>emptySet()),
100);
assertArrayEquals("Metadata got updated with wrong set of topics.",
@@ -154,7 +155,8 @@ public class MetadataTest {
Arrays.asList(new Node(0, "host1", 1000)),
Arrays.asList(
new PartitionInfo("topic", 0, null, null, null),
- new PartitionInfo("topic1", 0, null, null, null))),
+ new PartitionInfo("topic1", 0, null, null, null)),
+ Collections.<String>emptySet()),
100);
assertEquals("Listener did not update topics list correctly",
@@ -179,7 +181,8 @@ public class MetadataTest {
Collections.singletonList(new Node(0, "host1", 1000)),
Arrays.asList(
new PartitionInfo("topic", 0, null, null, null),
- new PartitionInfo("topic1", 0, null, null, null))),
+ new PartitionInfo("topic1", 0, null, null, null)),
+ Collections.<String>emptySet()),
100);
metadata.removeListener(listener);
@@ -188,7 +191,8 @@ public class MetadataTest {
Arrays.asList(new Node(0, "host1", 1000)),
Arrays.asList(
new PartitionInfo("topic2", 0, null, null, null),
- new PartitionInfo("topic3", 0, null, null, null))),
+ new PartitionInfo("topic3", 0, null, null, null)),
+ Collections.<String>emptySet()),
100);
assertEquals("Listener did not update topics list correctly",
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8667f22..7fd6d88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -114,7 +115,6 @@ public class ConsumerCoordinatorTest {
"consumer" + groupId,
metricTags,
time,
- requestTimeoutMs,
retryBackoffMs,
defaultOffsetCommitCallback,
autoCommitEnabled,
@@ -144,6 +144,24 @@ public class ConsumerCoordinatorTest {
assertTrue(future.succeeded());
}
+ @Test(expected = GroupAuthorizationException.class)
+ public void testGroupDescribeUnauthorized() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code()));
+ coordinator.ensureCoordinatorKnown();
+ }
+
+ @Test(expected = GroupAuthorizationException.class)
+ public void testGroupReadUnauthorized() {
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
+ Errors.GROUP_AUTHORIZATION_FAILED.code()));
+ coordinator.ensurePartitionAssignment();
+ }
+
@Test
public void testCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1e7a215..8711830 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -147,6 +148,23 @@ public class FetcherTest {
}
@Test
+ public void testUnauthorizedTopic() {
+ subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.seek(tp, 0);
+
+ // resize the limit of the buffer to pretend it is only fetch-size large
+ fetcher.initFetches(cluster);
+ client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
+ consumerClient.poll(0);
+ try {
+ fetcher.fetchedRecords();
+ fail("fetchedRecords should have thrown");
+ } catch (TopicAuthorizationException e) {
+ assertEquals(Collections.singleton(topicName), e.unauthorizedTopics());
+ }
+ }
+
+ @Test
public void testFetchDuringRebalance() {
subscriptions.subscribe(Arrays.asList(topicName), listener);
subscriptions.assignFromSubscribed(Arrays.asList(tp));
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 7a46c56..0a0bdd8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -58,7 +59,7 @@ public class MockProducerTest {
public void testPartitioner() throws Exception {
PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
- Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1));
+ Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet());
MockProducer<String, String> producer = new MockProducer<String, String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "key", "value");
Future<RecordMetadata> metadata = producer.send(record);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
index 977fa93..7a5cef6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -16,6 +16,7 @@ import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.producer.Partitioner;
@@ -36,7 +37,7 @@ public class DefaultPartitionerTest {
private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
new PartitionInfo(topic, 2, node1, nodes, nodes),
new PartitionInfo(topic, 0, node0, nodes, nodes));
- private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
+ private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions, Collections.<String>emptySet());
@Test
public void testKeyPartitionIsStable() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 132b83b..4674a91 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -63,7 +63,7 @@ public class RecordAccumulatorTest {
private byte[] key = "key".getBytes();
private byte[] value = "value".getBytes();
private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
- private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3));
+ private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.<String>emptySet());
private Metrics metrics = new Metrics(time);
Map<String, String> metricTags = new LinkedHashMap<String, String>();
private final long maxBlockTimeMs = 1000;
@@ -314,7 +314,7 @@ public class RecordAccumulatorTest {
accum.append(tp1, key, value, null, 0);
Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
- Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
+ Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>(), Collections.<String>emptySet());
now = time.milliseconds();
List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, cluster, now);
assertEquals(1, expiredBatches.size());
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 761b9db..5ee11d2 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -254,7 +254,8 @@ public class RequestResponseTest {
replicas[0] = node;
Node[] isr = new Node[1];
isr[0] = node;
- Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)));
+ Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)),
+ Collections.<String>emptySet());
Map<String, Errors> errors = new HashMap<String, Errors>();
errors.put("topic2", Errors.LEADER_NOT_AVAILABLE);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index ccf3a5f..0fb59e6 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -20,6 +20,7 @@ import static java.util.Arrays.asList;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -54,7 +55,7 @@ public class TestUtils {
List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
for (int i = 0; i < partitions; i++)
parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
- return new Cluster(asList(ns), parts);
+ return new Cluster(asList(ns), parts, Collections.<String>emptySet());
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
index 9fdbac7..d11165c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
@@ -81,7 +81,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
metricGrpPrefix,
metricTags,
time,
- requestTimeoutMs,
retryBackoffMs);
this.restUrl = restUrl;
this.configStorage = configStorage;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/core/src/main/scala/kafka/common/AuthorizationException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/AuthorizationException.scala b/core/src/main/scala/kafka/common/AuthorizationException.scala
index 12ee0fe..55919a5 100644
--- a/core/src/main/scala/kafka/common/AuthorizationException.scala
+++ b/core/src/main/scala/kafka/common/AuthorizationException.scala
@@ -20,6 +20,17 @@ package kafka.common
* Exception thrown when a principal is not authorized to perform an operation.
* @param message
*/
-class AuthorizationException(message: String) extends RuntimeException(message) {
+abstract class AuthorizationException(message: String) extends RuntimeException(message) {
+}
+
+class TopicAuthorizationException(message: String) extends AuthorizationException(message) {
+ def this() = this(null)
+}
+
+class GroupAuthorizationException(message: String) extends AuthorizationException(message) {
+ def this() = this(null)
+}
+
+class ClusterAuthorizationException(message: String) extends AuthorizationException(message) {
def this() = this(null)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 6f53fac..e0ebe94 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -57,10 +57,11 @@ object ErrorMapping {
// 24: UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY
// 25: UNKNOWN_CONSUMER_ID
// 26: INVALID_SESSION_TIMEOUT
- // 27: COMMITTING_PARTITIONS_NOT_ASSIGNED
+ // 27: REBALANCE_IN_PROGRESS
// 28: INVALID_COMMIT_OFFSET_SIZE
- val AuthorizationCode: Short = 29
- // 30: REBALANCE_IN_PROGRESS
+ val TopicAuthorizationCode: Short = 29
+ val GroupAuthorizationCode: Short = 30
+ val ClusterAuthorizationCode: Short = 31
private val exceptionToCode =
Map[Class[Throwable], Short](
@@ -83,7 +84,9 @@ object ErrorMapping {
classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode,
- classOf[AuthorizationException].asInstanceOf[Class[Throwable]] -> AuthorizationCode
+ classOf[TopicAuthorizationException].asInstanceOf[Class[Throwable]] -> TopicAuthorizationCode,
+ classOf[GroupAuthorizationException].asInstanceOf[Class[Throwable]] -> GroupAuthorizationCode,
+ classOf[ClusterAuthorizationException].asInstanceOf[Class[Throwable]] -> ClusterAuthorizationCode
).withDefaultValue(UnknownCode)
/* invert the mapping */
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index ceb6348..bb39722 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -16,25 +16,28 @@
*/
package kafka.security.auth
-import kafka.common.{BaseEnum, KafkaException}
+import kafka.common.{ErrorMapping, BaseEnum, KafkaException}
/**
* ResourceTypes.
*/
-sealed trait ResourceType extends BaseEnum
+sealed trait ResourceType extends BaseEnum { def errorCode: Short }
case object Cluster extends ResourceType {
val name = "Cluster"
+ val errorCode = ErrorMapping.ClusterAuthorizationCode
}
case object Topic extends ResourceType {
val name = "Topic"
+ val errorCode = ErrorMapping.TopicAuthorizationCode
}
case object Group extends ResourceType {
val name = "Group"
+ val errorCode = ErrorMapping.GroupAuthorizationCode
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0a2e0b9..21434f7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -183,6 +183,14 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetCommitRequest(request: RequestChannel.Request) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+ // reject the request immediately if not authorized to the group
+ if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
+ val errors = offsetCommitRequest.requestInfo.mapValues(_ => ErrorMapping.GroupAuthorizationCode)
+ val response = OffsetCommitResponse(errors, offsetCommitRequest.correlationId)
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+ return
+ }
+
// filter non-exist topics
val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) =>
!metadataCache.contains(topicAndPartition.topic)
@@ -191,13 +199,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
case (topicAndPartition, offsetMetadata) =>
- authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) &&
- authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))
+ authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
}
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
- val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.AuthorizationCode)
+ val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.TopicAuthorizationCode)
mergedCommitStatus.foreach { case (topicAndPartition, errorCode) =>
// we only print warnings for known errors here; only replica manager could see an unknown
@@ -298,7 +305,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
var errorInResponse = false
- val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.AuthorizationCode, -1))
+ val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
mergedResponseStatus.foreach { case (topicAndPartition, status) =>
// we only print warnings for known errors here; if it is unknown, it will cause
@@ -379,7 +386,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
}
- val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.AuthorizationCode, -1, MessageSet.Empty))
+ val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.TopicAuthorizationCode, -1, MessageSet.Empty))
// the callback for sending a fetch response
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
@@ -442,7 +449,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
}
- val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.AuthorizationCode, Nil))
+ val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.TopicAuthorizationCode, Nil))
val responseMap = authorizedRequestInfo.map(elem => {
val (topicAndPartition, partitionOffsetRequestInfo) = elem
@@ -614,7 +621,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.AuthorizationCode))
+ val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.TopicAuthorizationCode))
val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
val brokers = metadataCache.getAliveBrokers
@@ -630,12 +637,19 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetFetchRequest(request: RequestChannel.Request) {
val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
+ // reject the request immediately if not authorized to the group
+ if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
+ val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.GroupAuthorizationCode)
+ val response = OffsetFetchResponse(offsetFetchRequest.requestInfo.map{ _ -> authorizationError}.toMap)
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+ return
+ }
+
val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
- authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) &&
- authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))
+ authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
}
- val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.AuthorizationCode)
+ val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.TopicAuthorizationCode)
val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap
val response = if (offsetFetchRequest.versionId == 0) {
@@ -659,9 +673,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
})
- val unauthorizedTopics = unauthorizedTopicPartitions.map( topicAndPartition =>
- (topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,ErrorMapping.AuthorizationCode)))
- OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedTopics, offsetFetchRequest.correlationId)
+ OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedStatus, offsetFetchRequest.correlationId)
} else {
// version 1 reads offsets from Kafka;
val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
@@ -683,7 +695,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseHeader = new ResponseHeader(request.header.correlationId)
if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
- val responseBody = new GroupCoordinatorResponse(Errors.AUTHORIZATION_FAILED.code, Node.noNode)
+ val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
@@ -716,7 +728,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val groups = describeRequest.groupIds().asScala.map {
case groupId =>
if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
- groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.AUTHORIZATION_FAILED)
+ groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
} else {
val (error, summary) = coordinator.handleDescribeGroup(groupId)
val members = summary.members.map { member =>
@@ -738,7 +750,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseHeader = new ResponseHeader(request.header.correlationId)
val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
- ListGroupsResponse.fromError(Errors.AUTHORIZATION_FAILED)
+ ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
} else {
val (error, groups) = coordinator.handleListGroups()
val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
@@ -766,7 +778,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
val responseBody = new JoinGroupResponse(
- ErrorMapping.AuthorizationCode,
+ ErrorMapping.GroupAuthorizationCode,
JoinGroupResponse.UNKNOWN_GENERATION_ID,
JoinGroupResponse.UNKNOWN_PROTOCOL,
JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
@@ -801,7 +813,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
- sendResponseCallback(Array[Byte](), ErrorMapping.AuthorizationCode)
+ sendResponseCallback(Array[Byte](), ErrorMapping.GroupAuthorizationCode)
} else {
coordinator.handleSyncGroup(
syncGroupRequest.groupId(),
@@ -826,7 +838,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
- val heartbeatResponse = new HeartbeatResponse(ErrorMapping.AuthorizationCode)
+ val heartbeatResponse = new HeartbeatResponse(ErrorMapping.GroupAuthorizationCode)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
}
else {
@@ -877,7 +889,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
- val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.AuthorizationCode)
+ val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.GroupAuthorizationCode)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse)))
} else {
// let the coordinator to handle leave-group
@@ -897,7 +909,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
- throw new AuthorizationException(s"Request $request is not authorized.")
+ throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
}