You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/10/29 16:30:13 UTC

kafka git commit: KAFKA-2598; Adding integration test for the authorizer at API level. …

Repository: kafka
Updated Branches:
  refs/heads/trunk 47c888078 -> 1b5687b9e


KAFKA-2598; Adding integration test for the authorizer at API level. …

…Some bug fixes that I encountered while running the tests.

Author: Parth Brahmbhatt <br...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #300 from Parth-Brahmbhatt/KAFKA-2598


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1b5687b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1b5687b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1b5687b9

Branch: refs/heads/trunk
Commit: 1b5687b9e4cd10e0f91fb921d3569fdd44be163d
Parents: 47c8880
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Oct 29 08:30:06 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Oct 29 08:30:06 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |   2 +
 .../clients/consumer/internals/Fetcher.java     |  26 +-
 .../common/errors/AuthorizationException.java   |  19 +
 .../common/network/SaslChannelBuilder.java      |   3 +-
 .../kafka/common/network/SslChannelBuilder.java |   3 +-
 .../common/requests/SyncGroupResponse.java      |   4 +
 .../kafka/common/security/kerberos/Login.java   |   3 +-
 .../kafka/api/ControlledShutdownRequest.scala   |   4 +-
 .../scala/kafka/api/UpdateMetadataRequest.scala |   2 +-
 .../kafka/common/AuthorizationException.scala   |   1 +
 .../scala/kafka/security/auth/Authorizer.scala  |   1 -
 .../src/main/scala/kafka/server/KafkaApis.scala |  62 +--
 .../kafka/api/AuthorizerIntegrationTest.scala   | 388 +++++++++++++++++++
 13 files changed, 477 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index bc9ef21..cce13dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -779,6 +779,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException if there is OffsetOutOfRange error in fetchResponse and
      *         the defaultResetPolicy is NONE
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
+     *
+     * @throws org.apache.kafka.common.errors.AuthorizationException if caller does not have Read permission on topic.
      */
     @Override
     public ConsumerRecords<K, V> poll(long timeout) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e18a58b..4e0d5ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -23,10 +23,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.DisconnectException;
-import org.apache.kafka.common.errors.InvalidMetadataException;
-import org.apache.kafka.common.errors.OffsetOutOfRangeException;
-import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.*;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -57,7 +54,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
+import java.util.HashSet;
 
 /**
  * This class manage the fetching process with the brokers.
@@ -81,6 +78,7 @@ public class Fetcher<K, V> {
     private final Deserializer<V> valueDeserializer;
 
     private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
+    private final Set<TopicPartition> unauthorizedTopicPartitions;
     private final Map<TopicPartition, Long> recordTooLargePartitions;
 
     public Fetcher(ConsumerNetworkClient client,
@@ -112,6 +110,7 @@ public class Fetcher<K, V> {
 
         this.records = new LinkedList<PartitionRecords<K, V>>();
         this.offsetOutOfRangePartitions = new HashMap<>();
+        this.unauthorizedTopicPartitions = new HashSet<>();
         this.recordTooLargePartitions = new HashMap<>();
 
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
@@ -303,6 +302,19 @@ public class Fetcher<K, V> {
     }
 
     /**
+     * If any topic from previous fetchResponse contatains Authorization error, throw ApiException.
+     * @throws ApiException
+     */
+    private void throwIfUnauthorized() throws ApiException {
+        if (!unauthorizedTopicPartitions.isEmpty()) {
+            StringBuilder sb = new StringBuilder();
+            for (TopicPartition topicPartition : unauthorizedTopicPartitions)
+                sb.append(topicPartition + ",");
+            unauthorizedTopicPartitions.clear();
+            throw new AuthorizationException(String.format("Not authorized to read from %s", sb.substring(0, sb.length() - 1).toString()));
+        }
+    }
+     /**
      * If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException
      *
      * @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
@@ -334,6 +346,7 @@ public class Fetcher<K, V> {
         } else {
             Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
             throwIfOffsetOutOfRange();
+            throwIfUnauthorized();
             throwIfRecordTooLarge();
 
             for (PartitionRecords<K, V> part : this.records) {
@@ -544,6 +557,9 @@ public class Fetcher<K, V> {
                     else
                         this.offsetOutOfRangePartitions.put(tp, fetchOffset);
                     log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
+                } else if (partition.errorCode == Errors.AUTHORIZATION_FAILED.code()) {
+                    log.warn("Not authorized to read from topic {}.", tp.topic());
+                    unauthorizedTopicPartitions.add(tp);
                 } else if (partition.errorCode == Errors.UNKNOWN.code()) {
                     log.warn("Unknown error fetching data for topic-partition {}", tp);
                 } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
new file mode 100644
index 0000000..2a01e5e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class AuthorizationException extends ApiException {
+    public AuthorizationException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 4d52738..d50055a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -103,8 +103,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
     protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
         if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
             return SslTransportLayer.create(id, key,
-                sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
-                socketChannel.socket().getPort()));
+                sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()));
         } else {
             return new PlaintextTransportLayer(key);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 8edd37e..9a7ba0c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -69,7 +69,6 @@ public class SslChannelBuilder implements ChannelBuilder {
     protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
         SocketChannel socketChannel = (SocketChannel) key.channel();
         return SslTransportLayer.create(id, key,
-            sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
-            socketChannel.socket().getPort()));
+            sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index a96b7e5..0eb92f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -68,4 +68,8 @@ public class SyncGroupResponse extends AbstractRequestResponse {
         return memberState;
     }
 
+    public static SyncGroupResponse parse(ByteBuffer buffer) {
+        return new SyncGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
index 470ab96..be91845 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
@@ -341,8 +341,7 @@ public class Login {
      * Re-login a principal. This method assumes that {@link #login(String)} has happened already.
      * @throws javax.security.auth.login.LoginException on a failure
      */
-    private synchronized void reLogin()
-            throws LoginException {
+    private synchronized void reLogin() throws LoginException {
         if (!isKrbTicket) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 33c107f..6fb9e22 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -58,7 +58,7 @@ case class ControlledShutdownRequest(versionId: Short,
   def sizeInBytes: Int = {
     2 + /* version id */
       4 + /* correlation id */
-      clientId.fold(0)(shortStringLength)
+      clientId.fold(0)(shortStringLength) +
       4 /* broker id */
   }
 
@@ -67,7 +67,7 @@ case class ControlledShutdownRequest(versionId: Short,
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition])
+    val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition])
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index d59de82..11c32cd 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -127,7 +127,7 @@ case class UpdateMetadataRequest (versionId: Short,
   }
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]))
+    val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/main/scala/kafka/common/AuthorizationException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/AuthorizationException.scala b/core/src/main/scala/kafka/common/AuthorizationException.scala
index 009cf1a..12ee0fe 100644
--- a/core/src/main/scala/kafka/common/AuthorizationException.scala
+++ b/core/src/main/scala/kafka/common/AuthorizationException.scala
@@ -21,4 +21,5 @@ package kafka.common
  * @param message
  */
 class AuthorizationException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/main/scala/kafka/security/auth/Authorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 8f1a660..939ed12 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -18,7 +18,6 @@
 package kafka.security.auth
 
 import kafka.network.RequestChannel.Session
-import kafka.server.KafkaConfig
 import org.apache.kafka.common.Configurable
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2ef9730..af6bb5e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -212,7 +212,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
     }
 
-    if (offsetCommitRequest.versionId == 0) {
+    if (authorizedRequestInfo.isEmpty)
+      sendResponseCallback(Map.empty)
+    else if (offsetCommitRequest.versionId == 0) {
       // for version 0 always store offsets to ZK
       val responseInfo = authorizedRequestInfo.map {
         case (topicAndPartition, metaAndError) => {
@@ -339,22 +341,26 @@ class KafkaApis(val requestChannel: RequestChannel,
                                                                    produceResponseCallback)
     }
 
-    // only allow appending to internal topic partitions
-    // if the client is not from admin
-    val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
-
-    // call the replica manager to append messages to the replicas
-    replicaManager.appendMessages(
-      produceRequest.ackTimeoutMs.toLong,
-      produceRequest.requiredAcks,
-      internalTopicsAllowed,
-      authorizedRequestInfo,
-      sendResponseCallback)
-
-    // if the request is put into the purgatory, it will have a held reference
-    // and hence cannot be garbage collected; hence we clear its data here in
-    // order to let GC re-claim its memory since it is already appended to log
-    produceRequest.emptyData()
+    if (authorizedRequestInfo.isEmpty)
+      sendResponseCallback(Map.empty)
+    else {
+      // only allow appending to internal topic partitions
+      // if the client is not from admin
+      val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
+
+      // call the replica manager to append messages to the replicas
+      replicaManager.appendMessages(
+        produceRequest.ackTimeoutMs.toLong,
+        produceRequest.requiredAcks,
+        internalTopicsAllowed,
+        authorizedRequestInfo,
+        sendResponseCallback)
+
+      // if the request is put into the purgatory, it will have a held reference
+      // and hence cannot be garbage collected; hence we clear its data here in
+      // order to let GC re-claim its memory since it is already appended to log
+      produceRequest.emptyData()
+    }
   }
 
   /**
@@ -387,7 +393,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       def fetchResponseCallback(delayTimeMs: Int) {
-        val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTimeMs)
+        val response = FetchResponse(fetchRequest.correlationId, mergedResponseStatus, fetchRequest.versionId, delayTimeMs)
         requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
       }
 
@@ -403,13 +409,17 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    // call the replica manager to fetch messages from the local replica
-    replicaManager.fetchMessages(
-      fetchRequest.maxWait.toLong,
-      fetchRequest.replicaId,
-      fetchRequest.minBytes,
-      authorizedRequestInfo,
-      sendResponseCallback)
+    if (authorizedRequestInfo.isEmpty)
+      sendResponseCallback(Map.empty)
+    else {
+      // call the replica manager to fetch messages from the local replica
+      replicaManager.fetchMessages(
+        fetchRequest.maxWait.toLong,
+        fetchRequest.replicaId,
+        fetchRequest.minBytes,
+        authorizedRequestInfo,
+        sendResponseCallback)
+    }
   }
 
   /**
@@ -596,7 +606,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.AuthorizationCode))
 
-    val topicMetadata = getTopicMetadata(authorizedTopics, request.securityProtocol)
+    val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
     val brokers = metadataCache.getAliveBrokers
     trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
     val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata  ++ unauthorizedTopicMetaData, metadataRequest.correlationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1b5687b9/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
new file mode 100644
index 0000000..b1db12a
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package integration.kafka.api
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.concurrent.ExecutionException
+import java.util.{ArrayList, Properties}
+
+import kafka.api.RequestKeys
+import kafka.cluster.EndPoint
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.coordinator.GroupCoordinator
+import kafka.integration.KafkaServerTestHarness
+import kafka.security.auth._
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{ApiException, AuthorizationException, TimeoutException}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.junit.Assert._
+import org.junit.{After, Assert, Before, Test}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.Buffer
+
+class AuthorizerIntegrationTest extends KafkaServerTestHarness {
+  val topic = "topic"
+  val part = 0
+  val brokerId: Integer = 0
+  val correlationId = 0
+  val clientId = "client-Id"
+  val tp = new TopicPartition(topic, part)
+  val topicAndPartition = new TopicAndPartition(topic, part)
+  val group = "my-group"
+  val topicResource = new Resource(Topic, topic)
+  val groupResource = new Resource(Group, group)
+
+  val GroupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
+  val ClusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
+  val TopicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
+  val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
+  val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+
+  val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+
+  val numServers = 1
+  val producerCount = 1
+  val consumerCount = 2
+  val producerConfig = new Properties
+  val numRecords = 1
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
+  overridingProps.put(KafkaConfig.BrokerIdProp, brokerId.toString)
+  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+
+  val endPoint = new EndPoint("localhost", 0, SecurityProtocol.PLAINTEXT)
+  
+  var RequestKeyToRequest: mutable.LinkedHashMap[Short, AbstractRequest] = null
+
+  val RequestKeyToResponseDeserializer: Map[Short, Class[_ <: Any]] =
+    Map(RequestKeys.MetadataKey -> classOf[MetadataResponse],
+      RequestKeys.ProduceKey -> classOf[ProduceResponse],
+      RequestKeys.FetchKey -> classOf[FetchResponse],
+      RequestKeys.OffsetsKey -> classOf[ListOffsetResponse],
+      RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse],
+      RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse],
+      RequestKeys.GroupMetadataKey -> classOf[GroupMetadataResponse],
+      RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse],
+      RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse],
+      RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse],
+      RequestKeys.HeartbeatKey -> classOf[HeartbeatResponse],
+      RequestKeys.LeaveGroupKey -> classOf[LeaveGroupResponse],
+      RequestKeys.LeaderAndIsrKey -> classOf[LeaderAndIsrResponse],
+      RequestKeys.StopReplicaKey -> classOf[StopReplicaResponse],
+      RequestKeys.ControlledShutdownKey -> classOf[ControlledShutdownResponse]
+    )
+
+  val RequestKeyToErrorCode = Map[Short, (Nothing) => Short](
+    RequestKeys.MetadataKey -> ((resp: MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()),
+    RequestKeys.ProduceKey -> ((resp: ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode),
+    RequestKeys.FetchKey -> ((resp: FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+    RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+    RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
+    RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+    RequestKeys.GroupMetadataKey -> ((resp: GroupMetadataResponse) => resp.errorCode()),
+    RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()),
+    RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()),
+    RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()),
+    RequestKeys.HeartbeatKey -> ((resp: HeartbeatResponse) => resp.errorCode()),
+    RequestKeys.LeaveGroupKey -> ((resp: LeaveGroupResponse) => resp.errorCode()),
+    RequestKeys.LeaderAndIsrKey -> ((resp: LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
+    RequestKeys.StopReplicaKey -> ((resp: StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
+    RequestKeys.ControlledShutdownKey -> ((resp: ControlledShutdownResponse) => resp.errorCode())
+  )
+
+  val RequestKeysToAcls = Map[Short, Map[Resource, Set[Acl]]](
+    RequestKeys.MetadataKey -> TopicDescribeAcl,
+    RequestKeys.ProduceKey -> TopicWriteAcl,
+    RequestKeys.FetchKey -> TopicReadAcl,
+    RequestKeys.OffsetsKey -> TopicDescribeAcl,
+    RequestKeys.OffsetCommitKey -> (TopicReadAcl ++ GroupReadAcl),
+    RequestKeys.OffsetFetchKey -> (TopicReadAcl ++ GroupReadAcl),
+    RequestKeys.GroupMetadataKey -> (TopicReadAcl ++ GroupReadAcl),
+    RequestKeys.UpdateMetadataKey -> ClusterAcl,
+    RequestKeys.JoinGroupKey -> GroupReadAcl,
+    RequestKeys.SyncGroupKey -> GroupReadAcl,
+    RequestKeys.HeartbeatKey -> GroupReadAcl,
+    RequestKeys.LeaveGroupKey -> GroupReadAcl,
+    RequestKeys.LeaderAndIsrKey -> ClusterAcl,
+    RequestKeys.StopReplicaKey -> ClusterAcl,
+    RequestKeys.ControlledShutdownKey -> ClusterAcl
+  )
+
+  // configure the servers and clients
+  override def generateConfigs() = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+
+    for (i <- 0 until producerCount)
+      producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+        acks = 1)
+    for (i <- 0 until consumerCount)
+      consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
+
+    // create the consumer offset topic
+    TestUtils.createTopic(zkUtils, GroupCoordinator.OffsetsTopicName,
+      1,
+      1,
+      servers,
+      servers.head.consumerCoordinator.offsetsTopicConfigs)
+    // create the test topic with all the brokers as replicas
+    TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers)
+
+    val joinReq = new JoinGroupRequest(group, 30000, JoinGroupRequest.UNKNOWN_MEMBER_ID, "consumer",
+      List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
+
+    //we have to get a join call so the group is created and we get back a memberId
+    addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
+    val socket = new Socket("localhost", servers.head.boundPort())
+    val joinResponse = sendRequestAndVerifyResponseErrorCode(socket, RequestKeys.JoinGroupKey, joinReq, ErrorMapping.NoError).asInstanceOf[JoinGroupResponse]
+    val memberId = joinResponse.memberId()
+
+    //remove group acls
+    removeAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
+
+    RequestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest](
+      RequestKeys.MetadataKey -> new MetadataRequest(List(topic).asJava),
+      RequestKeys.ProduceKey -> new ProduceRequest(1, 5000, collection.mutable.Map(tp -> ByteBuffer.wrap("test".getBytes)).asJava),
+      RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0, 100)).asJava),
+      RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava),
+      RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava),
+      RequestKeys.GroupMetadataKey -> new GroupMetadataRequest(group),
+      RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue,
+        Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+        Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava),
+      RequestKeys.JoinGroupKey -> new JoinGroupRequest(group, 30000, memberId, "consumer",
+        List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava),
+      RequestKeys.SyncGroupKey -> new SyncGroupRequest(group, 1, memberId, Map(memberId -> ByteBuffer.wrap("test".getBytes())).asJava),
+      RequestKeys.OffsetCommitKey -> new OffsetCommitRequest(group, 1, memberId, 1000, Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava),
+      RequestKeys.HeartbeatKey -> new HeartbeatRequest(group, 1, memberId),
+      RequestKeys.LeaveGroupKey -> new LeaveGroupRequest(group, memberId),
+      RequestKeys.LeaderAndIsrKey -> new LeaderAndIsrRequest(brokerId, Int.MaxValue,
+        Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+        Set(new LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava),
+      RequestKeys.StopReplicaKey -> new StopReplicaRequest(brokerId, Int.MaxValue, true, Set(tp).asJava),
+      RequestKeys.ControlledShutdownKey -> new ControlledShutdownRequest(brokerId)
+    )
+  }
+
+  @After
+  override def tearDown() = {
+    removeAllAcls
+    super.tearDown()
+  }
+
+  @Test
+  def testAuthorization() {
+    val socket = new Socket("localhost", servers.head.boundPort())
+
+    for ((key, request) <- RequestKeyToRequest) {
+      removeAllAcls
+
+      sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.AuthorizationCode)
+
+      for ((resource, acls) <- RequestKeysToAcls(key))
+        addAndVerifyAcls(acls, resource)
+
+      sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.NoError)
+    }
+  }
+
+    @Test
+    def testProduceNeedsAuthorization() {
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
+      try {
+        sendRecords(numRecords, tp)
+        Assert.fail("should have thrown exception")
+      } catch {
+        case e: ApiException => Assert.assertEquals(Errors.AUTHORIZATION_FAILED.exception().getMessage, e.getMessage)
+      }
+    }
+
+    @Test
+    def testOnlyWritePermissionAllowsWritingToProducer() {
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+      sendRecords(numRecords, tp)
+    }
+
+    @Test
+    def testCreatePermissionNeededForWritingToNonExistentTopic() {
+      val newTopic = "newTopic"
+      val topicPartition = new TopicPartition(newTopic, 0)
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
+      try {
+        sendRecords(numRecords, topicPartition)
+        Assert.fail("should have thrown exception")
+      } catch {
+        case e: TimeoutException =>
+        //TODO Need to update the producer so it actually throws the server side of exception.
+        case e: Exception => Assert.fail(s"Only timeout exception should be thrown but $e thrown")
+      }
+
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create),
+        new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
+      sendRecords(numRecords, topicPartition)
+    }
+
+    @Test
+    def testConsumerNeedsAuthorization() {
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+      //TODO: Ideally we would want to test that when consumerGroup permission is not present we still get an AuthorizationException
+      //but the consumer fetcher currently waits forever for the consumer metadata to become available.
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+      sendRecords(1, tp)
+      try {
+        this.consumers.head.assign(List(tp).asJava)
+        consumeRecords(this.consumers.head)
+        Assert.fail("should have thrown exception")
+      } catch {
+        case e: AuthorizationException => Assert.assertEquals("Not authorized to read from topic-0", e.getMessage)
+      }
+    }
+
+    @Test
+    def testAllowingReadOnTopicAndGroupAllowsReading() {
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
+      sendRecords(1, tp)
+      this.consumers.head.assign(List(tp).asJava)
+      consumeRecords(this.consumers.head)
+    }
+
+//    TODO: The following test goes into an infinite loop as consumer waits for consumer metadata to be propogated for ever.
+//    @Test
+//    def testCreatePermissionNeededToReadFromNonExistentTopic() {
+//      val newTopic = "newTopic"
+//      val topicPartition = new TopicPartition(newTopic, 0)
+//      val newTopicResource = new Resource(Topic, newTopic)
+//      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource)
+//      addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
+//      addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource)
+//      try {
+//        this.consumers(0).assign(List(topicPartition).asJava)
+//        consumeRecords(this.consumers(0))
+//        Assert.fail("should have thrown exception")
+//      } catch {
+//        //checking for the message and type to ensure whenever these things are fixed on client side the test starts failing.
+//        case e: ApiException => Assert.assertEquals(e.getMessage, "Request is not authorized.")
+//      }
+//
+//      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource)
+//      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
+//
+//      sendRecords(numRecords, topicPartition)
+//      consumeRecords(this.consumers(0))
+//    }
+
+  def removeAllAcls() = {
+    servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
+      servers.head.apis.authorizer.get.removeAcls(resource)
+      TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.apis.authorizer.get, resource)
+    }
+  }
+
+  def sendRequestAndVerifyResponseErrorCode(socket: Socket, key: Short, request: AbstractRequest, expectedErrorCode: Short): AbstractRequestResponse = {
+    val header = new RequestHeader(key, "client", 1)
+    val body = request.toStruct
+
+    val buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf())
+    header.writeTo(buffer)
+    body.writeTo(buffer)
+    buffer.rewind()
+    val requestBytes = buffer.array()
+
+    sendRequest(socket, key, requestBytes)
+    val resp = receiveResponse(socket)
+    ResponseHeader.parse(resp)
+
+    val response = RequestKeyToResponseDeserializer(key).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse]
+    Assert.assertEquals(s"$key failed", expectedErrorCode, RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse) => Short](response))
+    response
+  }
+
+  private def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
+    val outgoing = new DataOutputStream(socket.getOutputStream)
+    outgoing.writeInt(request.length)
+    outgoing.write(request)
+    outgoing.flush()
+  }
+
+  private def receiveResponse(socket: Socket): ByteBuffer = {
+    val incoming = new DataInputStream(socket.getInputStream)
+    val len = incoming.readInt()
+    val response = new Array[Byte](len)
+    incoming.readFully(response)
+    ByteBuffer.wrap(response)
+  }
+
+  private def sendRecords(numRecords: Int, tp: TopicPartition) {
+    val futures = (0 until numRecords).map { i =>
+      this.producers.head.send(new ProducerRecord(tp.topic(), tp.partition(), i.toString.getBytes, i.toString.getBytes))
+    }
+    try {
+      futures.foreach(_.get)
+    } catch {
+      case e: ExecutionException => throw e.getCause
+    }
+  }
+
+  private def addAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
+    servers.head.apis.authorizer.get.addAcls(acls, resource)
+    TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource)
+  }
+
+  private def removeAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
+    servers.head.apis.authorizer.get.removeAcls(acls, resource)
+    TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) -- acls, servers.head.apis.authorizer.get, resource)
+  }
+
+
+  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int = 1, startingOffset: Int =
+  0) {
+    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    val maxIters = numRecords * 50
+    var iters = 0
+    while (records.size < numRecords) {
+      for (record <- consumer.poll(50).asScala) {
+        records.add(record)
+      }
+      if (iters > maxIters)
+        throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
+      iters += 1
+    }
+    for (i <- 0 until numRecords) {
+      val record = records.get(i)
+      val offset = startingOffset + i
+      assertEquals(topic, record.topic())
+      assertEquals(part, record.partition())
+      assertEquals(offset.toLong, record.offset())
+    }
+  }
+}