You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/29 16:30:13 UTC
kafka git commit: KAFKA-2598; Adding integration test for the authorizer at API level. …
Repository: kafka
Updated Branches:
refs/heads/trunk 47c888078 -> 1b5687b9e
KAFKA-2598; Adding integration test for the authorizer at API level. …
…Some bug fixes that I encountered while running the tests.
Author: Parth Brahmbhatt <br...@gmail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #300 from Parth-Brahmbhatt/KAFKA-2598
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1b5687b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1b5687b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1b5687b9
Branch: refs/heads/trunk
Commit: 1b5687b9e4cd10e0f91fb921d3569fdd44be163d
Parents: 47c8880
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Oct 29 08:30:06 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Oct 29 08:30:06 2015 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 2 +
.../clients/consumer/internals/Fetcher.java | 26 +-
.../common/errors/AuthorizationException.java | 19 +
.../common/network/SaslChannelBuilder.java | 3 +-
.../kafka/common/network/SslChannelBuilder.java | 3 +-
.../common/requests/SyncGroupResponse.java | 4 +
.../kafka/common/security/kerberos/Login.java | 3 +-
.../kafka/api/ControlledShutdownRequest.scala | 4 +-
.../scala/kafka/api/UpdateMetadataRequest.scala | 2 +-
.../kafka/common/AuthorizationException.scala | 1 +
.../scala/kafka/security/auth/Authorizer.scala | 1 -
.../src/main/scala/kafka/server/KafkaApis.scala | 62 +--
.../kafka/api/AuthorizerIntegrationTest.scala | 388 +++++++++++++++++++
13 files changed, 477 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index bc9ef21..cce13dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -779,6 +779,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.OffsetOutOfRangeException if there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+ *
+ * @throws org.apache.kafka.common.errors.AuthorizationException if caller does not have Read permission on topic.
*/
@Override
public ConsumerRecords<K, V> poll(long timeout) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e18a58b..4e0d5ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -23,10 +23,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.DisconnectException;
-import org.apache.kafka.common.errors.InvalidMetadataException;
-import org.apache.kafka.common.errors.OffsetOutOfRangeException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.*;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -57,7 +54,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
+import java.util.HashSet;
/**
* This class manage the fetching process with the brokers.
@@ -81,6 +78,7 @@ public class Fetcher<K, V> {
private final Deserializer<V> valueDeserializer;
private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
+ private final Set<TopicPartition> unauthorizedTopicPartitions;
private final Map<TopicPartition, Long> recordTooLargePartitions;
public Fetcher(ConsumerNetworkClient client,
@@ -112,6 +110,7 @@ public class Fetcher<K, V> {
this.records = new LinkedList<PartitionRecords<K, V>>();
this.offsetOutOfRangePartitions = new HashMap<>();
+ this.unauthorizedTopicPartitions = new HashSet<>();
this.recordTooLargePartitions = new HashMap<>();
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
@@ -303,6 +302,19 @@ public class Fetcher<K, V> {
}
/**
+ * If any topic from previous fetchResponse contatains Authorization error, throw ApiException.
+ * @throws ApiException
+ */
+ private void throwIfUnauthorized() throws ApiException {
+ if (!unauthorizedTopicPartitions.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ for (TopicPartition topicPartition : unauthorizedTopicPartitions)
+ sb.append(topicPartition + ",");
+ unauthorizedTopicPartitions.clear();
+ throw new AuthorizationException(String.format("Not authorized to read from %s", sb.substring(0, sb.length() - 1).toString()));
+ }
+ }
+ /**
* If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException
*
* @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
@@ -334,6 +346,7 @@ public class Fetcher<K, V> {
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
throwIfOffsetOutOfRange();
+ throwIfUnauthorized();
throwIfRecordTooLarge();
for (PartitionRecords<K, V> part : this.records) {
@@ -544,6 +557,9 @@ public class Fetcher<K, V> {
else
this.offsetOutOfRangePartitions.put(tp, fetchOffset);
log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
+ } else if (partition.errorCode == Errors.AUTHORIZATION_FAILED.code()) {
+ log.warn("Not authorized to read from topic {}.", tp.topic());
+ unauthorizedTopicPartitions.add(tp);
} else if (partition.errorCode == Errors.UNKNOWN.code()) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
new file mode 100644
index 0000000..2a01e5e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class AuthorizationException extends ApiException {
+ public AuthorizationException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 4d52738..d50055a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -103,8 +103,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
return SslTransportLayer.create(id, key,
- sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
- socketChannel.socket().getPort()));
+ sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()));
} else {
return new PlaintextTransportLayer(key);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 8edd37e..9a7ba0c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -69,7 +69,6 @@ public class SslChannelBuilder implements ChannelBuilder {
protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
return SslTransportLayer.create(id, key,
- sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
- socketChannel.socket().getPort()));
+ sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index a96b7e5..0eb92f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -68,4 +68,8 @@ public class SyncGroupResponse extends AbstractRequestResponse {
return memberState;
}
+ public static SyncGroupResponse parse(ByteBuffer buffer) {
+ return new SyncGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
index 470ab96..be91845 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
@@ -341,8 +341,7 @@ public class Login {
* Re-login a principal. This method assumes that {@link #login(String)} has happened already.
* @throws javax.security.auth.login.LoginException on a failure
*/
- private synchronized void reLogin()
- throws LoginException {
+ private synchronized void reLogin() throws LoginException {
if (!isKrbTicket) {
return;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 33c107f..6fb9e22 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -58,7 +58,7 @@ case class ControlledShutdownRequest(versionId: Short,
def sizeInBytes: Int = {
2 + /* version id */
4 + /* correlation id */
- clientId.fold(0)(shortStringLength)
+ clientId.fold(0)(shortStringLength) +
4 /* broker id */
}
@@ -67,7 +67,7 @@ case class ControlledShutdownRequest(versionId: Short,
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition])
+ val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition])
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index d59de82..11c32cd 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -127,7 +127,7 @@ case class UpdateMetadataRequest (versionId: Short,
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]))
+ val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/main/scala/kafka/common/AuthorizationException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/AuthorizationException.scala b/core/src/main/scala/kafka/common/AuthorizationException.scala
index 009cf1a..12ee0fe 100644
--- a/core/src/main/scala/kafka/common/AuthorizationException.scala
+++ b/core/src/main/scala/kafka/common/AuthorizationException.scala
@@ -21,4 +21,5 @@ package kafka.common
* @param message
*/
class AuthorizationException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/main/scala/kafka/security/auth/Authorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 8f1a660..939ed12 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -18,7 +18,6 @@
package kafka.security.auth
import kafka.network.RequestChannel.Session
-import kafka.server.KafkaConfig
import org.apache.kafka.common.Configurable
import org.apache.kafka.common.security.auth.KafkaPrincipal
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2ef9730..af6bb5e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -212,7 +212,9 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
}
- if (offsetCommitRequest.versionId == 0) {
+ if (authorizedRequestInfo.isEmpty)
+ sendResponseCallback(Map.empty)
+ else if (offsetCommitRequest.versionId == 0) {
// for version 0 always store offsets to ZK
val responseInfo = authorizedRequestInfo.map {
case (topicAndPartition, metaAndError) => {
@@ -339,22 +341,26 @@ class KafkaApis(val requestChannel: RequestChannel,
produceResponseCallback)
}
- // only allow appending to internal topic partitions
- // if the client is not from admin
- val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
-
- // call the replica manager to append messages to the replicas
- replicaManager.appendMessages(
- produceRequest.ackTimeoutMs.toLong,
- produceRequest.requiredAcks,
- internalTopicsAllowed,
- authorizedRequestInfo,
- sendResponseCallback)
-
- // if the request is put into the purgatory, it will have a held reference
- // and hence cannot be garbage collected; hence we clear its data here in
- // order to let GC re-claim its memory since it is already appended to log
- produceRequest.emptyData()
+ if (authorizedRequestInfo.isEmpty)
+ sendResponseCallback(Map.empty)
+ else {
+ // only allow appending to internal topic partitions
+ // if the client is not from admin
+ val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
+
+ // call the replica manager to append messages to the replicas
+ replicaManager.appendMessages(
+ produceRequest.ackTimeoutMs.toLong,
+ produceRequest.requiredAcks,
+ internalTopicsAllowed,
+ authorizedRequestInfo,
+ sendResponseCallback)
+
+ // if the request is put into the purgatory, it will have a held reference
+ // and hence cannot be garbage collected; hence we clear its data here in
+ // order to let GC re-claim its memory since it is already appended to log
+ produceRequest.emptyData()
+ }
}
/**
@@ -387,7 +393,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def fetchResponseCallback(delayTimeMs: Int) {
- val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTimeMs)
+ val response = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
}
@@ -403,13 +409,17 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- // call the replica manager to fetch messages from the local replica
- replicaManager.fetchMessages(
- fetchRequest.maxWait.toLong,
- fetchRequest.replicaId,
- fetchRequest.minBytes,
- authorizedRequestInfo,
- sendResponseCallback)
+ if (authorizedRequestInfo.isEmpty)
+ sendResponseCallback(Map.empty)
+ else {
+ // call the replica manager to fetch messages from the local replica
+ replicaManager.fetchMessages(
+ fetchRequest.maxWait.toLong,
+ fetchRequest.replicaId,
+ fetchRequest.minBytes,
+ authorizedRequestInfo,
+ sendResponseCallback)
+ }
}
/**
@@ -596,7 +606,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.AuthorizationCode))
- val topicMetadata = getTopicMetadata(authorizedTopics, request.securityProtocol)
+ val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
val brokers = metadataCache.getAliveBrokers
trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata ++ unauthorizedTopicMetaData, metadataRequest.correlationId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
new file mode 100644
index 0000000..b1db12a
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package integration.kafka.api
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.concurrent.ExecutionException
+import java.util.{ArrayList, Properties}
+
+import kafka.api.RequestKeys
+import kafka.cluster.EndPoint
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.coordinator.GroupCoordinator
+import kafka.integration.KafkaServerTestHarness
+import kafka.security.auth._
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{ApiException, AuthorizationException, TimeoutException}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.junit.Assert._
+import org.junit.{After, Assert, Before, Test}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.Buffer
+
+class AuthorizerIntegrationTest extends KafkaServerTestHarness {
+ val topic = "topic"
+ val part = 0
+ val brokerId: Integer = 0
+ val correlationId = 0
+ val clientId = "client-Id"
+ val tp = new TopicPartition(topic, part)
+ val topicAndPartition = new TopicAndPartition(topic, part)
+ val group = "my-group"
+ val topicResource = new Resource(Topic, topic)
+ val groupResource = new Resource(Group, group)
+
+ val GroupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
+ val ClusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
+ val TopicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
+ val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
+ val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+
+ val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+ val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+
+ val numServers = 1
+ val producerCount = 1
+ val consumerCount = 2
+ val producerConfig = new Properties
+ val numRecords = 1
+
+ val overridingProps = new Properties()
+ overridingProps.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
+ overridingProps.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+ overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+
+ val endPoint = new EndPoint("localhost", 0, SecurityProtocol.PLAINTEXT)
+
+ var RequestKeyToRequest: mutable.LinkedHashMap[Short, AbstractRequest] = null
+
+ val RequestKeyToResponseDeserializer: Map[Short, Class[_ <: Any]] =
+ Map(RequestKeys.MetadataKey -> classOf[MetadataResponse],
+ RequestKeys.ProduceKey -> classOf[ProduceResponse],
+ RequestKeys.FetchKey -> classOf[FetchResponse],
+ RequestKeys.OffsetsKey -> classOf[ListOffsetResponse],
+ RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse],
+ RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse],
+ RequestKeys.GroupMetadataKey -> classOf[GroupMetadataResponse],
+ RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse],
+ RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse],
+ RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse],
+ RequestKeys.HeartbeatKey -> classOf[HeartbeatResponse],
+ RequestKeys.LeaveGroupKey -> classOf[LeaveGroupResponse],
+ RequestKeys.LeaderAndIsrKey -> classOf[LeaderAndIsrResponse],
+ RequestKeys.StopReplicaKey -> classOf[StopReplicaResponse],
+ RequestKeys.ControlledShutdownKey -> classOf[ControlledShutdownResponse]
+ )
+
+ val RequestKeyToErrorCode = Map[Short, (Nothing) => Short](
+ RequestKeys.MetadataKey -> ((resp: MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()),
+ RequestKeys.ProduceKey -> ((resp: ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode),
+ RequestKeys.FetchKey -> ((resp: FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+ RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+ RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
+ RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+ RequestKeys.GroupMetadataKey -> ((resp: GroupMetadataResponse) => resp.errorCode()),
+ RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()),
+ RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()),
+ RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()),
+ RequestKeys.HeartbeatKey -> ((resp: HeartbeatResponse) => resp.errorCode()),
+ RequestKeys.LeaveGroupKey -> ((resp: LeaveGroupResponse) => resp.errorCode()),
+ RequestKeys.LeaderAndIsrKey -> ((resp: LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
+ RequestKeys.StopReplicaKey -> ((resp: StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
+ RequestKeys.ControlledShutdownKey -> ((resp: ControlledShutdownResponse) => resp.errorCode())
+ )
+
+ val RequestKeysToAcls = Map[Short, Map[Resource, Set[Acl]]](
+ RequestKeys.MetadataKey -> TopicDescribeAcl,
+ RequestKeys.ProduceKey -> TopicWriteAcl,
+ RequestKeys.FetchKey -> TopicReadAcl,
+ RequestKeys.OffsetsKey -> TopicDescribeAcl,
+ RequestKeys.OffsetCommitKey -> (TopicReadAcl ++ GroupReadAcl),
+ RequestKeys.OffsetFetchKey -> (TopicReadAcl ++ GroupReadAcl),
+ RequestKeys.GroupMetadataKey -> (TopicReadAcl ++ GroupReadAcl),
+ RequestKeys.UpdateMetadataKey -> ClusterAcl,
+ RequestKeys.JoinGroupKey -> GroupReadAcl,
+ RequestKeys.SyncGroupKey -> GroupReadAcl,
+ RequestKeys.HeartbeatKey -> GroupReadAcl,
+ RequestKeys.LeaveGroupKey -> GroupReadAcl,
+ RequestKeys.LeaderAndIsrKey -> ClusterAcl,
+ RequestKeys.StopReplicaKey -> ClusterAcl,
+ RequestKeys.ControlledShutdownKey -> ClusterAcl
+ )
+
+ // configure the servers and clients
+ override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+
+ @Before
+ override def setUp() {
+ super.setUp()
+
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+
+ for (i <- 0 until producerCount)
+ producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+ acks = 1)
+ for (i <- 0 until consumerCount)
+ consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
+
+ // create the consumer offset topic
+ TestUtils.createTopic(zkUtils, GroupCoordinator.OffsetsTopicName,
+ 1,
+ 1,
+ servers,
+ servers.head.consumerCoordinator.offsetsTopicConfigs)
+ // create the test topic with all the brokers as replicas
+ TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers)
+
+ val joinReq = new JoinGroupRequest(group, 30000, JoinGroupRequest.UNKNOWN_MEMBER_ID, "consumer",
+ List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
+
+ //we have to get a join call so the group is created and we get back a memberId
+ addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
+ val socket = new Socket("localhost", servers.head.boundPort())
+ val joinResponse = sendRequestAndVerifyResponseErrorCode(socket, RequestKeys.JoinGroupKey, joinReq, ErrorMapping.NoError).asInstanceOf[JoinGroupResponse]
+ val memberId = joinResponse.memberId()
+
+ //remove group acls
+ removeAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
+
+ RequestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest](
+ RequestKeys.MetadataKey -> new MetadataRequest(List(topic).asJava),
+ RequestKeys.ProduceKey -> new ProduceRequest(1, 5000, collection.mutable.Map(tp -> ByteBuffer.wrap("test".getBytes)).asJava),
+ RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0, 100)).asJava),
+ RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava),
+ RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava),
+ RequestKeys.GroupMetadataKey -> new GroupMetadataRequest(group),
+ RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue,
+ Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+ Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava),
+ RequestKeys.JoinGroupKey -> new JoinGroupRequest(group, 30000, memberId, "consumer",
+ List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava),
+ RequestKeys.SyncGroupKey -> new SyncGroupRequest(group, 1, memberId, Map(memberId -> ByteBuffer.wrap("test".getBytes())).asJava),
+ RequestKeys.OffsetCommitKey -> new OffsetCommitRequest(group, 1, memberId, 1000, Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava),
+ RequestKeys.HeartbeatKey -> new HeartbeatRequest(group, 1, memberId),
+ RequestKeys.LeaveGroupKey -> new LeaveGroupRequest(group, memberId),
+ RequestKeys.LeaderAndIsrKey -> new LeaderAndIsrRequest(brokerId, Int.MaxValue,
+ Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+ Set(new LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava),
+ RequestKeys.StopReplicaKey -> new StopReplicaRequest(brokerId, Int.MaxValue, true, Set(tp).asJava),
+ RequestKeys.ControlledShutdownKey -> new ControlledShutdownRequest(brokerId)
+ )
+ }
+
+ @After
+ override def tearDown() = {
+ removeAllAcls
+ super.tearDown()
+ }
+
+ @Test
+ def testAuthorization() {
+ val socket = new Socket("localhost", servers.head.boundPort())
+
+ for ((key, request) <- RequestKeyToRequest) {
+ removeAllAcls
+
+ sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.AuthorizationCode)
+
+ for ((resource, acls) <- RequestKeysToAcls(key))
+ addAndVerifyAcls(acls, resource)
+
+ sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.NoError)
+ }
+ }
+
+ @Test
+ def testProduceNeedsAuthorization() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+ try {
+ sendRecords(numRecords, tp)
+ Assert.fail("should have thrown exception")
+ } catch {
+ case e: ApiException => Assert.assertEquals(Errors.AUTHORIZATION_FAILED.exception().getMessage, e.getMessage)
+ }
+ }
+
+ @Test
+ def testOnlyWritePermissionAllowsWritingToProducer() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ sendRecords(numRecords, tp)
+ }
+
+ @Test
+ def testCreatePermissionNeededForWritingToNonExistentTopic() {
+ val newTopic = "newTopic"
+ val topicPartition = new TopicPartition(newTopic, 0)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
+ try {
+ sendRecords(numRecords, topicPartition)
+ Assert.fail("should have thrown exception")
+ } catch {
+ case e: TimeoutException =>
+ //TODO Need to update the producer so it actually throws the server side of exception.
+ case e: Exception => Assert.fail(s"Only timeout exception should be thrown but $e thrown")
+ }
+
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create),
+ new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+ sendRecords(numRecords, topicPartition)
+ }
+
+ @Test
+ def testConsumerNeedsAuthorization() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ //TODO: Ideally we would want to test that when consumerGroup permission is not present we still get an AuthorizationException
+ //but the consumer fetcher currently waits forever for the consumer metadata to become available.
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ sendRecords(1, tp)
+ try {
+ this.consumers.head.assign(List(tp).asJava)
+ consumeRecords(this.consumers.head)
+ Assert.fail("should have thrown exception")
+ } catch {
+ case e: AuthorizationException => Assert.assertEquals("Not authorized to read from topic-0", e.getMessage)
+ }
+ }
+
+ @Test
+ def testAllowingReadOnTopicAndGroupAllowsReading() {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+ sendRecords(1, tp)
+ this.consumers.head.assign(List(tp).asJava)
+ consumeRecords(this.consumers.head)
+ }
+
+// TODO: The following test goes into an infinite loop as consumer waits for consumer metadata to be propogated for ever.
+// @Test
+// def testCreatePermissionNeededToReadFromNonExistentTopic() {
+// val newTopic = "newTopic"
+// val topicPartition = new TopicPartition(newTopic, 0)
+// val newTopicResource = new Resource(Topic, newTopic)
+// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource)
+// addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
+// addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource)
+// try {
+// this.consumers(0).assign(List(topicPartition).asJava)
+// consumeRecords(this.consumers(0))
+// Assert.fail("should have thrown exception")
+// } catch {
+// //checking for the message and type to ensure whenever these things are fixed on client side the test starts failing.
+// case e: ApiException => Assert.assertEquals(e.getMessage, "Request is not authorized.")
+// }
+//
+// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource)
+// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
+//
+// sendRecords(numRecords, topicPartition)
+// consumeRecords(this.consumers(0))
+// }
+
+ def removeAllAcls() = {
+ servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
+ servers.head.apis.authorizer.get.removeAcls(resource)
+ TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.apis.authorizer.get, resource)
+ }
+ }
+
+ def sendRequestAndVerifyResponseErrorCode(socket: Socket, key: Short, request: AbstractRequest, expectedErrorCode: Short): AbstractRequestResponse = {
+ val header = new RequestHeader(key, "client", 1)
+ val body = request.toStruct
+
+ val buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf())
+ header.writeTo(buffer)
+ body.writeTo(buffer)
+ buffer.rewind()
+ val requestBytes = buffer.array()
+
+ sendRequest(socket, key, requestBytes)
+ val resp = receiveResponse(socket)
+ ResponseHeader.parse(resp)
+
+ val response = RequestKeyToResponseDeserializer(key).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse]
+ Assert.assertEquals(s"$key failed", expectedErrorCode, RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse) => Short](response))
+ response
+ }
+
+ private def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
+ val outgoing = new DataOutputStream(socket.getOutputStream)
+ outgoing.writeInt(request.length)
+ outgoing.write(request)
+ outgoing.flush()
+ }
+
+ private def receiveResponse(socket: Socket): ByteBuffer = {
+ val incoming = new DataInputStream(socket.getInputStream)
+ val len = incoming.readInt()
+ val response = new Array[Byte](len)
+ incoming.readFully(response)
+ ByteBuffer.wrap(response)
+ }
+
+ private def sendRecords(numRecords: Int, tp: TopicPartition) {
+ val futures = (0 until numRecords).map { i =>
+ this.producers.head.send(new ProducerRecord(tp.topic(), tp.partition(), i.toString.getBytes, i.toString.getBytes))
+ }
+ try {
+ futures.foreach(_.get)
+ } catch {
+ case e: ExecutionException => throw e.getCause
+ }
+ }
+
+ private def addAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
+ servers.head.apis.authorizer.get.addAcls(acls, resource)
+ TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource)
+ }
+
+ private def removeAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
+ servers.head.apis.authorizer.get.removeAcls(acls, resource)
+ TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) -- acls, servers.head.apis.authorizer.get, resource)
+ }
+
+
+ private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int = 1, startingOffset: Int =
+ 0) {
+ val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
+ val maxIters = numRecords * 50
+ var iters = 0
+ while (records.size < numRecords) {
+ for (record <- consumer.poll(50).asScala) {
+ records.add(record)
+ }
+ if (iters > maxIters)
+ throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
+ iters += 1
+ }
+ for (i <- 0 until numRecords) {
+ val record = records.get(i)
+ val offset = startingOffset + i
+ assertEquals(topic, record.topic())
+ assertEquals(part, record.partition())
+ assertEquals(offset.toLong, record.offset())
+ }
+ }
+}