You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/17 13:04:45 UTC

[1/2] kafka git commit: MINOR: A few cleanups in KafkaApis and TransactionMarkerChannelManager

Repository: kafka
Updated Branches:
  refs/heads/0.11.0 b1313935f -> bc6a3bc6f


MINOR: A few cleanups in KafkaApis and TransactionMarkerChannelManager

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Colin P. Mccabe <cm...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #3171 from hachikuji/minor-txn-channel-cleanups


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63605779
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63605779
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63605779

Branch: refs/heads/0.11.0
Commit: 63605779ef67fb39487cf0487c8ac4caa8d39cbc
Parents: b131393
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat Jun 17 02:03:25 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Jun 17 14:04:32 2017 +0100

----------------------------------------------------------------------
 .../common/requests/DeleteAclsResponse.java     |   8 +
 .../kafka/common/InterBrokerSendThread.scala    |  29 +--
 .../TransactionMarkerChannelManager.scala       |  22 +--
 .../scala/kafka/security/SecurityUtils.scala    |  48 +++++
 .../kafka/security/auth/PermissionType.scala    |   5 -
 .../src/main/scala/kafka/server/KafkaApis.scala | 187 ++++++-------------
 .../common/InterBrokerSendThreadTest.scala      |  16 +-
 .../TransactionMarkerChannelManagerTest.scala   |  20 +-
 8 files changed, 151 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 796e200..94cd6aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -49,6 +49,10 @@ public class DeleteAclsResponse extends AbstractResponse {
             this.acl = acl;
         }
 
+        public AclDeletionResult(AclBinding acl) {
+            this(null, acl);
+        }
+
         public ApiException exception() {
             return exception;
         }
@@ -72,6 +76,10 @@ public class DeleteAclsResponse extends AbstractResponse {
             this.deletions = deletions;
         }
 
+        public AclFilterResponse(Collection<AclDeletionResult> deletions) {
+            this(null, deletions);
+        }
+
         public Throwable throwable() {
             return throwable;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 886e41c..06158b2 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -27,24 +27,27 @@ import org.apache.kafka.common.utils.Time
 /**
  *  Class for inter-broker send thread that utilize a non-blocking network client.
  */
-class InterBrokerSendThread(name: String,
-                            networkClient: NetworkClient,
-                            requestGenerator: () => Iterable[RequestAndCompletionHandler],
-                            time: Time,
-                            isInterruptible: Boolean = true)
+abstract class InterBrokerSendThread(name: String,
+                                     networkClient: NetworkClient,
+                                     time: Time,
+                                     isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {
 
-  // visible for testing
-  def generateRequests(): Iterable[RequestAndCompletionHandler] = requestGenerator()
+  def generateRequests(): Iterable[RequestAndCompletionHandler]
+
+  override def shutdown(): Unit = {
+    initiateShutdown()
+    // wake up the thread in case it is blocked inside poll
+    networkClient.wakeup()
+    awaitShutdown()
+  }
 
   override def doWork() {
     val now = time.milliseconds()
     var pollTimeout = Long.MaxValue
 
     try {
-      val requestsToSend: Iterable[RequestAndCompletionHandler] = requestGenerator()
-
-      for (request: RequestAndCompletionHandler <- requestsToSend) {
+      for (request: RequestAndCompletionHandler <- generateRequests()) {
         val destination = Integer.toString(request.destination.id())
         val completionHandler = request.handler
         val clientRequest = networkClient.newClientRequest(destination,
@@ -79,6 +82,10 @@ class InterBrokerSendThread(name: String,
         throw new FatalExitError()
     }
   }
+
+  def wakeup(): Unit = networkClient.wakeup()
+
 }
 
-case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest], handler: RequestCompletionHandler)
\ No newline at end of file
+case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest],
+                                       handler: RequestCompletionHandler)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 22f01c1..9c3ffd9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -122,16 +122,12 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
                                       networkClient: NetworkClient,
                                       txnStateManager: TransactionStateManager,
                                       txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
-                                      time: Time) extends Logging with KafkaMetricsGroup {
+                                      time: Time) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, time) with Logging with KafkaMetricsGroup {
 
   this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + "]: "
 
   private val interBrokerListenerName: ListenerName = config.interBrokerListenerName
 
-  private val txnMarkerSendThread: InterBrokerSendThread = {
-    new InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, drainQueuedTransactionMarkers, time)
-  }
-
   private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = new ConcurrentHashMap[Int, TxnMarkerQueue]().asScala
 
   private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode)
@@ -152,15 +148,10 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     }
   )
 
-  def start(): Unit = {
-    txnMarkerSendThread.start()
-  }
+  override def generateRequests() = drainQueuedTransactionMarkers()
 
-  def shutdown(): Unit = {
-    txnMarkerSendThread.initiateShutdown()
-    // wake up the thread in case it is blocked inside poll
-    networkClient.wakeup()
-    txnMarkerSendThread.awaitShutdown()
+  override def shutdown(): Unit = {
+    super.shutdown()
     txnMarkerPurgatory.shutdown()
     markersQueuePerBroker.clear()
   }
@@ -173,9 +164,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   // visible for testing
   private[transaction] def queueForUnknownBroker = markersQueueForUnknownBroker
 
-  // visible for testing
-  private[transaction] def senderThread = txnMarkerSendThread
-
   private[transaction] def addMarkersForBroker(broker: Node, txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry) {
     val brokerId = broker.id
 
@@ -369,7 +357,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
       }
     }
 
-    networkClient.wakeup()
+    wakeup()
   }
 
   def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/security/SecurityUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala
new file mode 100644
index 0000000..bbfc42c
--- /dev/null
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -0,0 +1,48 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.security
+
+import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceType}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter}
+import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+
+import scala.util.Try
+
+
+object SecurityUtils {
+
+  def convertToResourceAndAcl(filter: AclBindingFilter): Try[(Resource, Acl)] = {
+    for {
+      resourceType <- Try(ResourceType.fromJava(filter.resourceFilter.resourceType))
+      principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
+      operation <- Try(Operation.fromJava(filter.entryFilter.operation))
+      permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
+      resource = Resource(resourceType, filter.resourceFilter.name)
+      acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
+    } yield (resource, acl)
+  }
+
+  def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
+    val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name)
+    val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
+      acl.operation.toJava, acl.permissionType.toJava)
+    new AclBinding(adminResource, entry)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/security/auth/PermissionType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala
index 686c60b..c603351 100644
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala
@@ -21,11 +21,6 @@ import org.apache.kafka.common.acl.AclPermissionType
 
 import scala.util.{Failure, Success, Try}
 
-/**
- * PermissionType.
- */
-
-
 sealed trait PermissionType extends BaseEnum {
   val toJava: AclPermissionType
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 27eb816..337c740 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -34,6 +34,7 @@ import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.{RequestChannel, RequestOrResponseSend}
+import kafka.security.SecurityUtils
 import kafka.security.auth._
 import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors._
@@ -50,13 +51,12 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.SaslHandshakeResponse
-import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType}
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 
 import scala.collection._
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 
 /**
@@ -1782,70 +1782,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  /**
-    * Convert an ACL binding filter to a Scala object.
-    * All ACL and resource fields must be specified (no UNKNOWN, ANY, or null fields are allowed.)
-    *
-    * @param filter     The binding filter as a Java object.
-    * @return           The binding filter as a scala object, or an exception if there was an error
-    *                   converting the Java object.
-    */
-  def toScala(filter: AclBindingFilter) : Try[(Resource, Acl)] = {
-    filter.resourceFilter().resourceType() match {
-      case AdminResourceType.UNKNOWN => return Failure(new InvalidRequestException("Invalid UNKNOWN resource type"))
-      case AdminResourceType.ANY => return Failure(new InvalidRequestException("Invalid ANY resource type"))
-      case _ => {}
-    }
-    val resourceType: ResourceType = try {
-      ResourceType.fromJava(filter.resourceFilter.resourceType)
-    } catch {
-      case throwable: Throwable => return Failure(new InvalidRequestException("Invalid resource type"))
-    }
-    val principal: KafkaPrincipal = try {
-      KafkaPrincipal.fromString(filter.entryFilter.principal)
-    } catch {
-      case throwable: Throwable => return Failure(new InvalidRequestException("Invalid principal"))
-    }
-    filter.entryFilter().operation() match {
-      case AclOperation.UNKNOWN => return Failure(new InvalidRequestException("Invalid UNKNOWN operation type"))
-      case AclOperation.ANY => return Failure(new InvalidRequestException("Invalid ANY operation type"))
-      case _ => {}
-    }
-    val operation: Operation = try {
-      Operation.fromJava(filter.entryFilter.operation)
-    } catch {
-      case throwable: Throwable => return Failure(new InvalidRequestException(throwable.getMessage))
-    }
-    filter.entryFilter().permissionType() match {
-      case AclPermissionType.UNKNOWN => new InvalidRequestException("Invalid UNKNOWN permission type")
-      case AclPermissionType.ANY => new InvalidRequestException("Invalid ANY permission type")
-      case _ => {}
-    }
-    val permissionType: PermissionType = try {
-      PermissionType.fromJava(filter.entryFilter.permissionType)
-    } catch {
-      case throwable: Throwable => return Failure(new InvalidRequestException(throwable.getMessage))
-    }
-    return Success((Resource(resourceType, filter.resourceFilter().name()), Acl(principal, permissionType,
-                   filter.entryFilter().host(), operation)))
-  }
-
-  /**
-    * Convert a Scala ACL binding to a Java object.
-    *
-    * @param acl        The binding as a Scala object.
-    * @return           The binding as a Java object.
-    */
-  def toJava(acl: (Resource, Acl)) : AclBinding = {
-    acl match {
-      case (resource, acl) =>
-        val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name)
-        val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
-                                  acl.operation.toJava, acl.permissionType.toJava)
-        return new AclBinding(adminResource, entry)
-    }
-  }
-
   def handleCreateAcls(request: RequestChannel.Request): Unit = {
     authorizeClusterAlter(request)
     val createAclsRequest = request.body[CreateAclsRequest]
@@ -1855,11 +1791,9 @@ class KafkaApis(val requestChannel: RequestChannel,
           createAclsRequest.getErrorResponse(requestThrottleMs,
             new SecurityDisabledException("No Authorizer is configured on the broker.")))
       case Some(auth) =>
-        val errors = mutable.HashMap[Int, Throwable]()
-        for (i <- 0 until createAclsRequest.aclCreations.size) {
-          val result = toScala(createAclsRequest.aclCreations.get(i).acl.toFilter)
-          result match {
-            case Failure(throwable) => errors.put(i, throwable)
+        val aclCreationResults = createAclsRequest.aclCreations.asScala.map { aclCreation =>
+          SecurityUtils.convertToResourceAndAcl(aclCreation.acl.toFilter) match {
+            case Failure(throwable) => new AclCreationResponse(throwable)
             case Success((resource, acl)) => try {
                 if (resource.resourceType.equals(Cluster) &&
                     !resource.name.equals(Resource.ClusterResourceName))
@@ -1868,25 +1802,19 @@ class KafkaApis(val requestChannel: RequestChannel,
                 if (resource.name.isEmpty)
                   throw new InvalidRequestException("Invalid empty resource name")
                 auth.addAcls(immutable.Set(acl), resource)
-                if (logger.isDebugEnabled)
-                  logger.debug(s"Added acl $acl to $resource")
+
+                logger.debug(s"Added acl $acl to $resource")
+
+                new AclCreationResponse(null)
               } catch {
-                case throwable : Throwable => if (logger.isDebugEnabled) {
-                    logger.debug(s"Failed to add acl $acl to $resource", throwable)
-                  }
-                  errors.put(i, throwable)
+                case throwable: Throwable =>
+                  logger.debug(s"Failed to add acl $acl to $resource", throwable)
+                  new AclCreationResponse(throwable)
               }
           }
         }
-        val aclCreationResults = new java.util.ArrayList[AclCreationResponse]
-        for (i <- 0 to createAclsRequest.aclCreations().size() - 1) {
-          errors.get(i) match {
-            case Some(throwable) => aclCreationResults.add(new AclCreationResponse(throwable))
-            case None => aclCreationResults.add(new AclCreationResponse(null))
-          }
-        }
         sendResponseMaybeThrottle(request, requestThrottleMs =>
-          new CreateAclsResponse(requestThrottleMs, aclCreationResults))
+          new CreateAclsResponse(requestThrottleMs, aclCreationResults.asJava))
     }
   }
 
@@ -1899,60 +1827,53 @@ class KafkaApis(val requestChannel: RequestChannel,
           deleteAclsRequest.getErrorResponse(requestThrottleMs,
             new SecurityDisabledException("No Authorizer is configured on the broker.")))
       case Some(auth) =>
-        val filterResponseMap = mutable.HashMap[Int, AclFilterResponse]()
-        val toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]()
-        for (i <- 0 to deleteAclsRequest.filters().size - 1) {
-          toDelete.put(i, new ListBuffer[(Resource, Acl)]())
-        }
-        if (deleteAclsRequest.filters().asScala.exists { f => !f.matchesAtMostOne() }) {
-          // Delete based on filters that may match more than one ACL.
-          val aclMap : Map[Resource, Set[Acl]] = auth.getAcls()
-          aclMap.foreach { case (resource, acls) =>
-            acls.foreach { acl =>
-              val binding = new AclBinding(new AdminResource(AdminResourceType.
-                fromString(resource.resourceType.toString), resource.name),
-                new AccessControlEntry(acl.principal.toString(), acl.host.toString(),
-                  acl.operation.toJava, acl.permissionType.toJava))
-              for (i <- 0 to deleteAclsRequest.filters().size - 1) {
-                val filter = deleteAclsRequest.filters().get(i)
-                if (filter.matches(binding)) {
-                  toDelete.get(i).get += ((resource, acl))
-                }
-              }
+        val filters = deleteAclsRequest.filters.asScala
+        val filterResponseMap = mutable.Map[Int, AclFilterResponse]()
+        val toDelete = mutable.Map[Int, ArrayBuffer[(Resource, Acl)]]()
+
+        if (filters.forall(_.matchesAtMostOne)) {
+          // Delete based on a list of ACL fixtures.
+          for ((filter, i) <- filters.zipWithIndex) {
+            SecurityUtils.convertToResourceAndAcl(filter) match {
+              case Failure(throwable) => filterResponseMap.put(i, new AclFilterResponse(throwable, Seq.empty.asJava))
+              case Success(fixture) => toDelete.put(i, ArrayBuffer(fixture))
             }
           }
         } else {
-          // Delete based on a list of ACL fixtures.
-          for (i <- 0 to deleteAclsRequest.filters().size - 1) {
-            toScala(deleteAclsRequest.filters().get(i)) match {
-              case Failure(throwable) => filterResponseMap.put(i,
-                new AclFilterResponse(throwable, Collections.emptySet[AclDeletionResult]()))
-              case Success(fixture) => toDelete.put(i, ListBuffer(fixture))
-            }
+          // Delete based on filters that may match more than one ACL.
+          val aclMap = auth.getAcls()
+          val filtersWithIndex = filters.zipWithIndex
+          for ((resource, acls) <- aclMap; acl <- acls) {
+            val binding = new AclBinding(
+              new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name),
+              new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
+                acl.permissionType.toJava))
+
+            for ((filter, i) <- filtersWithIndex if filter.matches(binding))
+              toDelete.getOrElseUpdate(i, ArrayBuffer.empty) += ((resource, acl))
           }
         }
-        for (i <- toDelete.keys) {
-          val deletionResults = new util.ArrayList[AclDeletionResult]()
-          for (acls <- toDelete.get(i)) {
-            for ((resource, acl) <- acls) {
-              try {
-                if (auth.removeAcls(immutable.Set(acl), resource)) {
-                  deletionResults.add(new AclDeletionResult(null, toJava((resource, acl))))
-                }
-              } catch {
-                case throwable: Throwable => deletionResults.add(new AclDeletionResult(
-                  new UnknownServerException("Failed to delete ACL: " + throwable.toString),
-                    toJava((resource, acl))))
-              }
+
+        for ((i, acls) <- toDelete) {
+          val deletionResults = acls.flatMap { case (resource, acl) =>
+            val aclBinding = SecurityUtils.convertToAclBinding(resource, acl)
+            try {
+              if (auth.removeAcls(immutable.Set(acl), resource))
+                Some(new AclDeletionResult(aclBinding))
+              else None
+            } catch {
+              case throwable: Throwable =>
+                Some(new AclDeletionResult(new UnknownServerException(s"Failed to delete ACL $acl: $throwable"),
+                  aclBinding))
             }
-          }
-          filterResponseMap.put(i, new AclFilterResponse(null, deletionResults))
-        }
-        val filterResponses = new util.ArrayList[AclFilterResponse]
-        for (i <- 0 to deleteAclsRequest.filters().size() - 1) {
-          filterResponses.add(filterResponseMap.getOrElse(i,
-            new AclFilterResponse(null, new util.ArrayList[AclDeletionResult]())))
+          }.asJava
+          
+          filterResponseMap.put(i, new AclFilterResponse(deletionResults))
         }
+
+        val filterResponses = filters.indices.map { i =>
+          filterResponseMap.getOrElse(i, new AclFilterResponse(Seq.empty.asJava))
+        }.asJava
         sendResponseMaybeThrottle(request, requestThrottleMs => new DeleteAclsResponse(requestThrottleMs, filterResponses))
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index a508c41..c6ebdd1 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -34,7 +34,9 @@ class InterBrokerSendThreadTest {
 
   @Test
   def shouldNotSendAnythingWhenNoRequests(): Unit = {
-    val sendThread = new InterBrokerSendThread("name", networkClient, () => mutable.Iterable.empty, time)
+    val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override def generateRequests() = mutable.Iterable.empty
+    }
 
     // poll is always called but there should be no further invocations on NetworkClient
     EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
@@ -52,9 +54,9 @@ class InterBrokerSendThreadTest {
     val request = new StubRequestBuilder()
     val node = new Node(1, "", 8080)
     val handler = RequestAndCompletionHandler(node, request, completionHandler)
-    val sendThread = new InterBrokerSendThread("name", networkClient, () => {
-      List[RequestAndCompletionHandler](handler)
-    }, time)
+    val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override def generateRequests() = List[RequestAndCompletionHandler](handler)
+    }
 
     val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, handler.handler)
 
@@ -86,9 +88,9 @@ class InterBrokerSendThreadTest {
     val request = new StubRequestBuilder
     val node = new Node(1, "", 8080)
     val requestAndCompletionHandler = RequestAndCompletionHandler(node, request, completionHandler)
-    val sendThread = new InterBrokerSendThread("name", networkClient, () => {
-      List[RequestAndCompletionHandler](requestAndCompletionHandler)
-    }, time)
+    val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler)
+    }
 
     val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestAndCompletionHandler.handler)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 01a350b..9e7fb13 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -73,8 +73,6 @@ class TransactionMarkerChannelManagerTest {
     txnMarkerPurgatory,
     time)
 
-  private val senderThread = channelManager.senderThread
-
   private def mockCache(): Unit = {
     EasyMock.expect(txnStateManager.partitionFor(transactionalId1))
       .andReturn(txnTopicPartition1)
@@ -93,7 +91,7 @@ class TransactionMarkerChannelManagerTest {
 
   @Test
   def shouldGenerateEmptyMapWhenNoRequestsOutstanding(): Unit = {
-    assertTrue(senderThread.generateRequests().isEmpty)
+    assertTrue(channelManager.generateRequests().isEmpty)
   }
 
   @Test
@@ -131,12 +129,12 @@ class TransactionMarkerChannelManagerTest {
     val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
       Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
 
-    val requests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler =>
+    val requests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler =>
       (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
     }.toMap
 
     assertEquals(Map(broker1 -> expectedBroker1Request, broker2 -> expectedBroker2Request), requests)
-    assertTrue(senderThread.generateRequests().isEmpty)
+    assertTrue(channelManager.generateRequests().isEmpty)
   }
 
   @Test
@@ -208,13 +206,13 @@ class TransactionMarkerChannelManagerTest {
     val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
       Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
 
-    val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler =>
+    val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler =>
       (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
     }.toMap
 
     assertEquals(Map(broker2 -> expectedBroker2Request), firstDrainedRequests)
 
-    val secondDrainedRequests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler =>
+    val secondDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler =>
       (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
     }.toMap
 
@@ -294,7 +292,7 @@ class TransactionMarkerChannelManagerTest {
 
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2)
 
-    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests()
+    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = channelManager.generateRequests()
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
@@ -342,7 +340,7 @@ class TransactionMarkerChannelManagerTest {
 
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2)
 
-    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests()
+    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = channelManager.generateRequests()
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
@@ -396,7 +394,7 @@ class TransactionMarkerChannelManagerTest {
 
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2)
 
-    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests()
+    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = channelManager.generateRequests()
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
@@ -404,7 +402,7 @@ class TransactionMarkerChannelManagerTest {
     }
 
     // call this again so that append log will be retried
-    senderThread.generateRequests()
+    channelManager.generateRequests()
 
     EasyMock.verify(txnStateManager)
 


[2/2] kafka git commit: HOTFIX: Improve error handling for ACL requests

Posted by ij...@apache.org.
HOTFIX: Improve error handling for ACL requests

- Use ResourceType.toJava instead of ResourceType.fromString. The latter
doesn't work for TransactionalId (or any type with two camel-case
words).
- Replace Throwable with ApiError in response classes.
- Return InvalidRequest instead of Unknown error if ANY or UNKNOWN
are provided during ACL creation.
- Rename `unknown()` to `isUnknown()` in a few places that
were missed previously.
- Add tests.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3364 from ijuma/acls-fromString-fixes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc6a3bc6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc6a3bc6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc6a3bc6

Branch: refs/heads/0.11.0
Commit: bc6a3bc6f8c74df5437005f0c1da7822fa10ed2b
Parents: 6360577
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sat Jun 17 14:02:01 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Jun 17 14:04:37 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/admin/KafkaAdminClient.java   |  18 ++--
 .../org/apache/kafka/common/acl/AclBinding.java |   2 +-
 .../kafka/common/acl/AclBindingFilter.java      |   4 +-
 .../apache/kafka/common/requests/ApiError.java  |  10 ++
 .../common/requests/CreateAclsRequest.java      |   5 +-
 .../common/requests/CreateAclsResponse.java     |  33 ++----
 .../common/requests/DeleteAclsRequest.java      |   2 +-
 .../common/requests/DeleteAclsResponse.java     |  95 ++++++------------
 .../common/requests/DescribeAclsRequest.java    |   3 +-
 .../common/requests/DescribeAclsResponse.java   |  50 ++++------
 .../kafka/common/resource/ResourceFilter.java   |   2 +-
 .../clients/admin/KafkaAdminClientTest.java     | 100 ++++++-------------
 .../apache/kafka/common/acl/AclBindingTest.java |  14 +--
 .../common/requests/RequestResponseTest.java    |  31 +++---
 .../scala/kafka/security/SecurityUtils.scala    |  17 ++--
 .../main/scala/kafka/server/AdminManager.scala  |   8 +-
 .../kafka/server/DelayedCreateTopics.scala      |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  36 +++----
 .../kafka/api/AuthorizerIntegrationTest.scala   |  96 ++++++++++--------
 .../api/SaslSslAdminClientIntegrationTest.scala |  32 ++++--
 .../AbstractCreateTopicsRequestTest.scala       |   4 +-
 .../processor/internals/StreamsKafkaClient.java |   2 +-
 22 files changed, 251 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e92b1d3..881f8d2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1277,8 +1277,8 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 DescribeAclsResponse response = (DescribeAclsResponse) abstractResponse;
-                if (response.throwable() != null) {
-                    future.completeExceptionally(response.throwable());
+                if (response.error().isFailure()) {
+                    future.completeExceptionally(response.error().exception());
                 } else {
                     future.complete(response.acls());
                 }
@@ -1330,8 +1330,8 @@ public class KafkaAdminClient extends AdminClient {
                             "The broker reported no creation result for the given ACL."));
                     } else {
                         AclCreationResponse creation = iter.next();
-                        if (creation.throwable() != null) {
-                            future.completeExceptionally(creation.throwable());
+                        if (creation.error().isFailure()) {
+                            future.completeExceptionally(creation.error().exception());
                         } else {
                             future.complete(null);
                         }
@@ -1378,12 +1378,12 @@ public class KafkaAdminClient extends AdminClient {
                             "The broker reported no deletion result for the given filter."));
                     } else {
                         AclFilterResponse deletion = iter.next();
-                        if (deletion.throwable() != null) {
-                            future.completeExceptionally(deletion.throwable());
+                        if (deletion.error().isFailure()) {
+                            future.completeExceptionally(deletion.error().exception());
                         } else {
                             List<FilterResult> filterResults = new ArrayList<>();
                             for (AclDeletionResult deletionResult : deletion.deletions()) {
-                                filterResults.add(new FilterResult(deletionResult.acl(), deletionResult.exception()));
+                                filterResults.add(new FilterResult(deletionResult.acl(), deletionResult.error().exception()));
                             }
                             future.complete(new FilterResults(filterResults));
                         }
@@ -1433,7 +1433,7 @@ public class KafkaAdminClient extends AdminClient {
                     ConfigResource configResource = entry.getKey();
                     KafkaFutureImpl<Config> future = entry.getValue();
                     DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource));
-                    if (!config.error().is(Errors.NONE)) {
+                    if (config.error().isFailure()) {
                         future.completeExceptionally(config.error().exception());
                         continue;
                     }
@@ -1469,7 +1469,7 @@ public class KafkaAdminClient extends AdminClient {
                     DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
                     DescribeConfigsResponse.Config config = response.configs().get(resource);
 
-                    if (!config.error().is(Errors.NONE))
+                    if (config.error().isFailure())
                         brokerFuture.completeExceptionally(config.error().exception());
                     else {
                         List<ConfigEntry> configEntries = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
index ea58434..d264ef1 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
@@ -48,7 +48,7 @@ public class AclBinding {
     /**
      * Return true if this binding has any UNKNOWN components.
      */
-    public boolean unknown() {
+    public boolean isUnknown() {
         return resource.isUnknown() || entry.isUnknown();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
index 807b730..64f16cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
@@ -56,8 +56,8 @@ public class AclBindingFilter {
     /**
      * Return true if this filter has any UNKNOWN components.
      */
-    public boolean unknown() {
-        return resourceFilter.unknown() || entryFilter.isUnknown();
+    public boolean isUnknown() {
+        return resourceFilter.isUnknown() || entryFilter.isUnknown();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index 26034eb..d712123 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.protocol.types.Struct;
  */
 public class ApiError {
 
+    public static final ApiError NONE = new ApiError(Errors.NONE, null);
+
     private static final String CODE_KEY_NAME = "error_code";
     private static final String MESSAGE_KEY_NAME = "error_message";
 
@@ -67,6 +69,14 @@ public class ApiError {
         return this.error == error;
     }
 
+    public boolean isFailure() {
+        return !isSuccess();
+    }
+
+    public boolean isSuccess() {
+        return is(Errors.NONE);
+    }
+
     public Errors error() {
         return error;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index 757b5af..3598d4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -117,9 +117,8 @@ public class CreateAclsRequest extends AbstractRequest {
         switch (versionId) {
             case 0:
                 List<CreateAclsResponse.AclCreationResponse> responses = new ArrayList<>();
-                for (int i = 0; i < aclCreations.size(); i++) {
-                    responses.add(new CreateAclsResponse.AclCreationResponse(throwable));
-                }
+                for (int i = 0; i < aclCreations.size(); i++)
+                    responses.add(new CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable)));
                 return new CreateAclsResponse(throttleTimeMs, responses);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index c84b97c..1fc75da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -26,23 +25,21 @@ import java.util.List;
 
 public class CreateAclsResponse extends AbstractResponse {
     private final static String CREATION_RESPONSES = "creation_responses";
-    private final static String ERROR_CODE = "error_code";
-    private final static String ERROR_MESSAGE = "error_message";
 
     public static class AclCreationResponse {
-        private final Throwable throwable;
+        private final ApiError error;
 
-        public AclCreationResponse(Throwable throwable) {
-            this.throwable = throwable;
+        public AclCreationResponse(ApiError error) {
+            this.error = error;
         }
 
-        public Throwable throwable() {
-            return throwable;
+        public ApiError error() {
+            return error;
         }
 
         @Override
         public String toString() {
-            return "(" + throwable + ")";
+            return "(" + error + ")";
         }
     }
 
@@ -60,14 +57,8 @@ public class CreateAclsResponse extends AbstractResponse {
         this.aclCreationResponses = new ArrayList<>();
         for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) {
             Struct responseStruct = (Struct) responseStructObj;
-            short errorCode = responseStruct.getShort(ERROR_CODE);
-            String errorMessage = responseStruct.getString(ERROR_MESSAGE);
-            if (errorCode != 0) {
-                this.aclCreationResponses.add(new AclCreationResponse(
-                        Errors.forCode(errorCode).exception(errorMessage)));
-            } else {
-                this.aclCreationResponses.add(new AclCreationResponse(null));
-            }
+            ApiError error = new ApiError(responseStruct);
+            this.aclCreationResponses.add(new AclCreationResponse(error));
         }
     }
 
@@ -78,13 +69,7 @@ public class CreateAclsResponse extends AbstractResponse {
         List<Struct> responseStructs = new ArrayList<>();
         for (AclCreationResponse response : aclCreationResponses) {
             Struct responseStruct = struct.instance(CREATION_RESPONSES);
-            if (response.throwable() == null) {
-                responseStruct.set(ERROR_CODE, (short) 0);
-            } else {
-                Errors errors = Errors.forException(response.throwable());
-                responseStruct.set(ERROR_CODE, errors.code());
-                responseStruct.set(ERROR_MESSAGE, response.throwable().getMessage());
-            }
+            response.error.write(responseStruct);
             responseStructs.add(responseStruct);
         }
         struct.set(CREATION_RESPONSES, responseStructs.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 246b5e5..c05bec6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -96,7 +96,7 @@ public class DeleteAclsRequest extends AbstractRequest {
                 List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>();
                 for (int i = 0; i < filters.size(); i++) {
                     responses.add(new DeleteAclsResponse.AclFilterResponse(
-                        throwable, Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
+                        ApiError.fromThrowable(throwable), Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
                 }
                 return new DeleteAclsResponse(throttleTimeMs, responses);
             default:

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 94cd6aa..973aa8e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -18,9 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.utils.Utils;
@@ -30,31 +28,28 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 public class DeleteAclsResponse extends AbstractResponse {
     public static final Logger log = LoggerFactory.getLogger(DeleteAclsResponse.class);
     private final static String FILTER_RESPONSES = "filter_responses";
-    private final static String ERROR_CODE = "error_code";
-    private final static String ERROR_MESSAGE = "error_message";
     private final static String MATCHING_ACLS = "matching_acls";
 
     public static class AclDeletionResult {
-        private final ApiException exception;
+        private final ApiError error;
         private final AclBinding acl;
 
-        public AclDeletionResult(ApiException exception, AclBinding acl) {
-            this.exception = exception;
+        public AclDeletionResult(ApiError error, AclBinding acl) {
+            this.error = error;
             this.acl = acl;
         }
 
         public AclDeletionResult(AclBinding acl) {
-            this(null, acl);
+            this(ApiError.NONE, acl);
         }
 
-        public ApiException exception() {
-            return exception;
+        public ApiError error() {
+            return error;
         }
 
         public AclBinding acl() {
@@ -63,25 +58,25 @@ public class DeleteAclsResponse extends AbstractResponse {
 
         @Override
         public String toString() {
-            return "(apiException=" + exception + ", acl=" + acl + ")";
+            return "(error=" + error + ", acl=" + acl + ")";
         }
     }
 
     public static class AclFilterResponse {
-        private final Throwable throwable;
+        private final ApiError error;
         private final Collection<AclDeletionResult> deletions;
 
-        public AclFilterResponse(Throwable throwable, Collection<AclDeletionResult> deletions) {
-            this.throwable = throwable;
+        public AclFilterResponse(ApiError error, Collection<AclDeletionResult> deletions) {
+            this.error = error;
             this.deletions = deletions;
         }
 
         public AclFilterResponse(Collection<AclDeletionResult> deletions) {
-            this(null, deletions);
+            this(ApiError.NONE, deletions);
         }
 
-        public Throwable throwable() {
-            return throwable;
+        public ApiError error() {
+            return error;
         }
 
         public Collection<AclDeletionResult> deletions() {
@@ -90,7 +85,7 @@ public class DeleteAclsResponse extends AbstractResponse {
 
         @Override
         public String toString() {
-            return "(throwable=" + throwable + ", deletions=" + Utils.join(deletions, ",") + ")";
+            return "(error=" + error + ", deletions=" + Utils.join(deletions, ",") + ")";
         }
     }
 
@@ -108,29 +103,16 @@ public class DeleteAclsResponse extends AbstractResponse {
         this.responses = new ArrayList<>();
         for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) {
             Struct responseStruct = (Struct) responseStructObj;
-            short responseErrorCode = responseStruct.getShort(ERROR_CODE);
-            String responseErrorMessage = responseStruct.getString(ERROR_MESSAGE);
-            if (responseErrorCode != 0) {
-                this.responses.add(new AclFilterResponse(
-                    Errors.forCode(responseErrorCode).exception(responseErrorMessage),
-                    Collections.<AclDeletionResult>emptySet()));
-            } else {
-                List<AclDeletionResult> deletions = new ArrayList<>();
-                for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS)) {
-                    Struct matchingAclStruct = (Struct) matchingAclStructObj;
-                    short matchErrorCode = matchingAclStruct.getShort(ERROR_CODE);
-                    ApiException exception = null;
-                    if (matchErrorCode != 0) {
-                        Errors errors = Errors.forCode(matchErrorCode);
-                        String matchErrorMessage = matchingAclStruct.getString(ERROR_MESSAGE);
-                        exception = errors.exception(matchErrorMessage);
-                    }
-                    AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct);
-                    Resource resource = RequestUtils.resourceFromStructFields(matchingAclStruct);
-                    deletions.add(new AclDeletionResult(exception, new AclBinding(resource, entry)));
-                }
-                this.responses.add(new AclFilterResponse(null, deletions));
+            ApiError error = new ApiError(responseStruct);
+            List<AclDeletionResult> deletions = new ArrayList<>();
+            for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS)) {
+                Struct matchingAclStruct = (Struct) matchingAclStructObj;
+                ApiError matchError = new ApiError(matchingAclStruct);
+                AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct);
+                Resource resource = RequestUtils.resourceFromStructFields(matchingAclStruct);
+                deletions.add(new AclDeletionResult(matchError, new AclBinding(resource, entry)));
             }
+            this.responses.add(new AclFilterResponse(error, deletions));
         }
     }
 
@@ -141,29 +123,16 @@ public class DeleteAclsResponse extends AbstractResponse {
         List<Struct> responseStructs = new ArrayList<>();
         for (AclFilterResponse response : responses) {
             Struct responseStruct = struct.instance(FILTER_RESPONSES);
-            if (response.throwable() != null) {
-                Errors error = Errors.forException(response.throwable());
-                responseStruct.set(ERROR_CODE, error.code());
-                responseStruct.set(ERROR_MESSAGE, response.throwable().getMessage());
-                responseStruct.set(MATCHING_ACLS, new Struct[0]);
-            } else {
-                responseStruct.set(ERROR_CODE, (short) 0);
-                List<Struct> deletionStructs = new ArrayList<>();
-                for (AclDeletionResult deletion : response.deletions()) {
-                    Struct deletionStruct = responseStruct.instance(MATCHING_ACLS);
-                    if (deletion.exception() != null) {
-                        Errors error = Errors.forException(deletion.exception);
-                        deletionStruct.set(ERROR_CODE, error.code());
-                        deletionStruct.set(ERROR_MESSAGE, deletion.exception.getMessage());
-                    } else {
-                        deletionStruct.set(ERROR_CODE, (short) 0);
-                    }
-                    RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct);
-                    RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct);
-                    deletionStructs.add(deletionStruct);
-                }
-                responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new Struct[0]));
+            response.error.write(responseStruct);
+            List<Struct> deletionStructs = new ArrayList<>();
+            for (AclDeletionResult deletion : response.deletions()) {
+                Struct deletionStruct = responseStruct.instance(MATCHING_ACLS);
+                deletion.error.write(deletionStruct);
+                RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct);
+                RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct);
+                deletionStructs.add(deletionStruct);
             }
+            responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new Struct[0]));
             responseStructs.add(responseStruct);
         }
         struct.set(FILTER_RESPONSES, responseStructs.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 6573b6e..58ce539 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -73,7 +73,8 @@ public class DescribeAclsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new DescribeAclsResponse(throttleTimeMs, throwable, Collections.<AclBinding>emptySet());
+                return new DescribeAclsResponse(throttleTimeMs, ApiError.fromThrowable(throwable),
+                        Collections.<AclBinding>emptySet());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_ACLS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index cf21aa6..993a45f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -20,51 +20,41 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class DescribeAclsResponse extends AbstractResponse {
-    private final static String ERROR_CODE = "error_code";
-    private final static String ERROR_MESSAGE = "error_message";
     private final static String RESOURCES = "resources";
     private final static String ACLS = "acls";
 
     private final int throttleTimeMs;
-    private final Throwable throwable;
+    private final ApiError error;
     private final Collection<AclBinding> acls;
 
-    public DescribeAclsResponse(int throttleTimeMs, Throwable throwable, Collection<AclBinding> acls) {
+    public DescribeAclsResponse(int throttleTimeMs, ApiError error, Collection<AclBinding> acls) {
         this.throttleTimeMs = throttleTimeMs;
-        this.throwable = throwable;
+        this.error = error;
         this.acls = acls;
     }
 
     public DescribeAclsResponse(Struct struct) {
         this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
-        Errors error = Errors.forCode(struct.getShort(ERROR_CODE));
-        if (error != Errors.NONE) {
-            this.throwable = error.exception(struct.getString(ERROR_MESSAGE));
-            this.acls = Collections.emptySet();
-        } else {
-            this.throwable = null;
-            this.acls = new ArrayList<>();
-            for (Object resourceStructObj : struct.getArray(RESOURCES)) {
-                Struct resourceStruct = (Struct) resourceStructObj;
-                Resource resource = RequestUtils.resourceFromStructFields(resourceStruct);
-                for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) {
-                    Struct aclDataStruct = (Struct) aclDataStructObj;
-                    AccessControlEntry entry = RequestUtils.aceFromStructFields(aclDataStruct);
-                    this.acls.add(new AclBinding(resource, entry));
-                }
+        this.error = new ApiError(struct);
+        this.acls = new ArrayList<>();
+        for (Object resourceStructObj : struct.getArray(RESOURCES)) {
+            Struct resourceStruct = (Struct) resourceStructObj;
+            Resource resource = RequestUtils.resourceFromStructFields(resourceStruct);
+            for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) {
+                Struct aclDataStruct = (Struct) aclDataStructObj;
+                AccessControlEntry entry = RequestUtils.aceFromStructFields(aclDataStruct);
+                this.acls.add(new AclBinding(resource, entry));
             }
         }
     }
@@ -73,15 +63,8 @@ public class DescribeAclsResponse extends AbstractResponse {
     protected Struct toStruct(short version) {
         Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version));
         struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        if (throwable != null) {
-            Errors errors = Errors.forException(throwable);
-            struct.set(ERROR_CODE, errors.code());
-            struct.set(ERROR_MESSAGE, throwable.getMessage());
-            struct.set(RESOURCES, new Struct[0]);
-            return struct;
-        }
-        struct.set(ERROR_CODE, (short) 0);
-        struct.set(ERROR_MESSAGE, null);
+        error.write(struct);
+
         Map<Resource, List<AccessControlEntry>> resourceToData = new HashMap<>();
         for (AclBinding acl : acls) {
             List<AccessControlEntry> entry = resourceToData.get(acl.resource());
@@ -91,6 +74,7 @@ public class DescribeAclsResponse extends AbstractResponse {
             }
             entry.add(acl.entry());
         }
+
         List<Struct> resourceStructs = new ArrayList<>();
         for (Map.Entry<Resource, List<AccessControlEntry>> tuple : resourceToData.entrySet()) {
             Resource resource = tuple.getKey();
@@ -113,8 +97,8 @@ public class DescribeAclsResponse extends AbstractResponse {
         return throttleTimeMs;
     }
 
-    public Throwable throwable() {
-        return throwable;
+    public ApiError error() {
+        return error;
     }
 
     public Collection<AclBinding> acls() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
index 5032660..0a4611f 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
@@ -70,7 +70,7 @@ public class ResourceFilter {
     /**
      * Return true if this ResourceFilter has any UNKNOWN components.
      */
-    public boolean unknown() {
+    public boolean isUnknown() {
         return resourceType.isUnknown();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index cd6ed6b..8300e0f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -48,8 +48,6 @@ import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -60,6 +58,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -178,7 +177,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(new Node(0, "localhost", 8121));
             env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
-                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2})))),
+                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
                     new CreateTopicsOptions().timeoutMs(1000)).all();
             assertFutureError(future, TimeoutException.class);
         }
@@ -192,7 +191,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(env.cluster().controller());
             env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
-                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2})))),
+                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
                     new CreateTopicsOptions().timeoutMs(10000)).all();
             future.get();
         }
@@ -215,21 +214,18 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where we get back ACL1 and ACL2.
-            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null,
-                new ArrayList<AclBinding>() {{
-                        add(ACL1);
-                        add(ACL2);
-                    }}));
+            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, ApiError.NONE,
+                    asList(ACL1, ACL2)));
             assertCollectionIs(env.adminClient().describeAcls(FILTER1).values().get(), ACL1, ACL2);
 
             // Test a call where we get back no results.
-            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null,
+            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, ApiError.NONE,
                 Collections.<AclBinding>emptySet()));
             assertTrue(env.adminClient().describeAcls(FILTER2).values().get().isEmpty());
 
             // Test a call where we get back an error.
             env.kafkaClient().prepareResponse(new DescribeAclsResponse(0,
-                new SecurityDisabledException("Security is disabled"), Collections.<AclBinding>emptySet()));
+                new ApiError(Errors.SECURITY_DISABLED, "Security is disabled"), Collections.<AclBinding>emptySet()));
             assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
         }
     }
@@ -243,30 +239,19 @@ public class KafkaAdminClientTest {
 
             // Test a call where we successfully create two ACLs.
             env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
-                new ArrayList<AclCreationResponse>() {{
-                        add(new AclCreationResponse(null));
-                        add(new AclCreationResponse(null));
-                    }}));
-            CreateAclsResult results = env.adminClient().createAcls(new ArrayList<AclBinding>() {{
-                        add(ACL1);
-                        add(ACL2);
-                    }});
+                asList(new AclCreationResponse(ApiError.NONE), new AclCreationResponse(ApiError.NONE))));
+            CreateAclsResult results = env.adminClient().createAcls(asList(ACL1, ACL2));
             assertCollectionIs(results.values().keySet(), ACL1, ACL2);
-            for (KafkaFuture<Void> future : results.values().values()) {
+            for (KafkaFuture<Void> future : results.values().values())
                 future.get();
-            }
             results.all().get();
 
             // Test a call where we fail to create one ACL.
-            env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
-                    new ArrayList<AclCreationResponse>() {{
-                        add(new AclCreationResponse(new SecurityDisabledException("Security is disabled")));
-                        add(new AclCreationResponse(null));
-                    }}));
-            results = env.adminClient().createAcls(new ArrayList<AclBinding>() {{
-                    add(ACL1);
-                    add(ACL2);
-                }});
+            env.kafkaClient().prepareResponse(new CreateAclsResponse(0, asList(
+                new AclCreationResponse(new ApiError(Errors.SECURITY_DISABLED, "Security is disabled")),
+                new AclCreationResponse(ApiError.NONE))
+            ));
+            results = env.adminClient().createAcls(asList(ACL1, ACL2));
             assertCollectionIs(results.values().keySet(), ACL1, ACL2);
             assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
             results.values().get(ACL2).get();
@@ -282,19 +267,11 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where one filter has an error.
-            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{
-                    add(new AclFilterResponse(null,
-                            new ArrayList<AclDeletionResult>() {{
-                                add(new AclDeletionResult(null, ACL1));
-                                add(new AclDeletionResult(null, ACL2));
-                            }}));
-                    add(new AclFilterResponse(new SecurityDisabledException("No security"),
-                        Collections.<AclDeletionResult>emptySet()));
-                }}));
-            DeleteAclsResult results = env.adminClient().deleteAcls(new ArrayList<AclBindingFilter>() {{
-                        add(FILTER1);
-                        add(FILTER2);
-                    }});
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
+                    new AclFilterResponse(asList(new AclDeletionResult(ACL1), new AclDeletionResult(ACL2))),
+                    new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"),
+                            Collections.<AclDeletionResult>emptySet()))));
+            DeleteAclsResult results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.values();
             FilterResults filter1Results = filterResults.get(FILTER1).get();
             assertEquals(null, filter1Results.values().get(0).exception());
@@ -305,38 +282,19 @@ public class KafkaAdminClientTest {
             assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where one deletion result has an error.
-            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{
-                    add(new AclFilterResponse(null,
-                        new ArrayList<AclDeletionResult>() {{
-                                add(new AclDeletionResult(null, ACL1));
-                                add(new AclDeletionResult(new SecurityDisabledException("No security"), ACL2));
-                            }}));
-                    add(new AclFilterResponse(null, Collections.<AclDeletionResult>emptySet()));
-                }}));
-            results = env.adminClient().deleteAcls(
-                    new ArrayList<AclBindingFilter>() {{
-                            add(FILTER1);
-                            add(FILTER2);
-                        }});
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
+                    new AclFilterResponse(asList(new AclDeletionResult(ACL1),
+                            new AclDeletionResult(new ApiError(Errors.SECURITY_DISABLED, "No security"), ACL2))),
+                    new AclFilterResponse(Collections.<AclDeletionResult>emptySet()))));
+            results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             assertTrue(results.values().get(FILTER2).get().values().isEmpty());
             assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where there are no errors.
-            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{
-                    add(new AclFilterResponse(null,
-                        new ArrayList<AclDeletionResult>() {{
-                                add(new AclDeletionResult(null, ACL1));
-                            }}));
-                    add(new AclFilterResponse(null,
-                        new ArrayList<AclDeletionResult>() {{
-                                add(new AclDeletionResult(null, ACL2));
-                            }}));
-                }}));
-            results = env.adminClient().deleteAcls(
-                    new ArrayList<AclBindingFilter>() {{
-                        add(FILTER1);
-                        add(FILTER2);
-                    }});
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
+                    new AclFilterResponse(asList(new AclDeletionResult(ACL1))),
+                    new AclFilterResponse(asList(new AclDeletionResult(ACL2))))));
+            results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             Collection<AclBinding> deleted = results.all().get();
             assertCollectionIs(deleted, ACL1, ACL2);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
index e0a0598..0ebcdfe 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
@@ -92,13 +92,13 @@ public class AclBindingTest {
 
     @Test
     public void testUnknowns() throws Exception {
-        assertFalse(ACL1.unknown());
-        assertFalse(ACL2.unknown());
-        assertFalse(ACL3.unknown());
-        assertFalse(ANY_ANONYMOUS.unknown());
-        assertFalse(ANY_DENY.unknown());
-        assertFalse(ANY_MYTOPIC.unknown());
-        assertTrue(UNKNOWN_ACL.unknown());
+        assertFalse(ACL1.isUnknown());
+        assertFalse(ACL2.isUnknown());
+        assertFalse(ACL3.isUnknown());
+        assertFalse(ANY_ANONYMOUS.isUnknown());
+        assertFalse(ANY_DENY.isUnknown());
+        assertFalse(ANY_MYTOPIC.isUnknown());
+        assertTrue(UNKNOWN_ACL.isUnknown());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 327f228..467afb3 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
@@ -226,7 +225,7 @@ public class RequestResponseTest {
         checkResponse(createTxnOffsetCommitResponse(), 0);
         checkRequest(createListAclsRequest());
         checkErrorResponse(createListAclsRequest(), new SecurityDisabledException("Security is not enabled."));
-        checkResponse(createListAclsResponse(), ApiKeys.DESCRIBE_ACLS.latestVersion());
+        checkResponse(createDescribeAclsResponse(), ApiKeys.DESCRIBE_ACLS.latestVersion());
         checkRequest(createCreateAclsRequest());
         checkErrorResponse(createCreateAclsRequest(), new SecurityDisabledException("Security is not enabled."));
         checkResponse(createCreateAclsResponse(), ApiKeys.CREATE_ACLS.latestVersion());
@@ -1001,8 +1000,8 @@ public class RequestResponseTest {
                 new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).build();
     }
 
-    private DescribeAclsResponse createListAclsResponse() {
-        return new DescribeAclsResponse(0, null, Collections.singleton(new AclBinding(
+    private DescribeAclsResponse createDescribeAclsResponse() {
+        return new DescribeAclsResponse(0, ApiError.NONE, Collections.singleton(new AclBinding(
             new Resource(ResourceType.TOPIC, "mytopic"),
             new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))));
     }
@@ -1019,8 +1018,8 @@ public class RequestResponseTest {
     }
 
     private CreateAclsResponse createCreateAclsResponse() {
-        return new CreateAclsResponse(0, Arrays.asList(new AclCreationResponse(null),
-            new AclCreationResponse(new InvalidRequestException("Foo bar"))));
+        return new CreateAclsResponse(0, Arrays.asList(new AclCreationResponse(ApiError.NONE),
+            new AclCreationResponse(new ApiError(Errors.INVALID_REQUEST, "Foo bar"))));
     }
 
     private DeleteAclsRequest createDeleteAclsRequest() {
@@ -1036,16 +1035,14 @@ public class RequestResponseTest {
 
     private DeleteAclsResponse createDeleteAclsResponse() {
         List<AclFilterResponse> responses = new ArrayList<>();
-        responses.add(new AclFilterResponse(null,
-            new HashSet<AclDeletionResult>() {{
-                    add(new AclDeletionResult(null, new AclBinding(
+        responses.add(new AclFilterResponse(Utils.mkSet(
+                new AclDeletionResult(new AclBinding(
                         new Resource(ResourceType.TOPIC, "mytopic3"),
-                        new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))));
-                    add(new AclDeletionResult(null, new AclBinding(
+                        new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))),
+                new AclDeletionResult(new AclBinding(
                         new Resource(ResourceType.TOPIC, "mytopic4"),
-                        new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY))));
-                }}));
-        responses.add(new AclFilterResponse(new SecurityDisabledException("No security"),
+                        new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY))))));
+        responses.add(new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"),
             Collections.<AclDeletionResult>emptySet()));
         return new DeleteAclsResponse(0, responses);
     }
@@ -1071,9 +1068,9 @@ public class RequestResponseTest {
                 new DescribeConfigsResponse.ConfigEntry("another_name", "another value", true, false, true)
         );
         configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new DescribeConfigsResponse.Config(
-                new ApiError(Errors.NONE, null), configEntries));
+                ApiError.NONE, configEntries));
         configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new DescribeConfigsResponse.Config(
-                new ApiError(Errors.NONE, null), Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
+                ApiError.NONE, Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
         return new DescribeConfigsResponse(200, configs);
     }
 
@@ -1091,7 +1088,7 @@ public class RequestResponseTest {
 
     private AlterConfigsResponse createAlterConfigsResponse() {
         Map<org.apache.kafka.common.requests.Resource, ApiError> errors = new HashMap<>();
-        errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new ApiError(Errors.NONE, null));
+        errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), ApiError.NONE);
         errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid"));
         return new AlterConfigsResponse(20, errors);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/main/scala/kafka/security/SecurityUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala
index bbfc42c..573a16b 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -19,27 +19,32 @@ package kafka.security
 
 import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceType}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter}
-import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.ApiError
+import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 
 object SecurityUtils {
 
-  def convertToResourceAndAcl(filter: AclBindingFilter): Try[(Resource, Acl)] = {
-    for {
+  def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
+    (for {
       resourceType <- Try(ResourceType.fromJava(filter.resourceFilter.resourceType))
       principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
       operation <- Try(Operation.fromJava(filter.entryFilter.operation))
       permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
       resource = Resource(resourceType, filter.resourceFilter.name)
       acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
-    } yield (resource, acl)
+    } yield (resource, acl)) match {
+      case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage))
+      case Success(s) => Right(s)
+    }
   }
 
   def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
-    val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name)
+    val adminResource = new AdminResource(resource.resourceType.toJava, resource.name)
     val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
       acl.operation.toJava, acl.permissionType.toJava)
     new AclBinding(adminResource, entry)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 33c6b77..84972f3 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -119,7 +119,7 @@ class AdminManager(val config: KafkaConfig,
             else
               AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
         }
-        CreateTopicMetadata(topic, assignments, new ApiError(Errors.NONE, null))
+        CreateTopicMetadata(topic, assignments, ApiError.NONE)
       } catch {
         // Log client errors at a lower level than unexpected exceptions
         case e@ (_: PolicyViolationException | _: ApiException) =>
@@ -135,7 +135,7 @@ class AdminManager(val config: KafkaConfig,
     if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
       val results = metadata.map { createTopicMetadata =>
         // ignore topics that already have errors
-        if (createTopicMetadata.error.is(Errors.NONE) && !validateOnly) {
+        if (createTopicMetadata.error.isSuccess() && !validateOnly) {
           (createTopicMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
         } else {
           (createTopicMetadata.topic, createTopicMetadata.error)
@@ -212,7 +212,7 @@ class AdminManager(val config: KafkaConfig,
           new DescribeConfigsResponse.ConfigEntry(name, valueAsString, isSensitive, isDefault(name), isReadOnly)
         }
 
-        new DescribeConfigsResponse.Config(new ApiError(Errors.NONE, null), configEntries.asJava)
+        new DescribeConfigsResponse.Config(ApiError.NONE, configEntries.asJava)
       }
 
       try {
@@ -280,7 +280,7 @@ class AdminManager(val config: KafkaConfig,
                 else
                   AdminUtils.changeTopicConfig(zkUtils, topic, properties)
             }
-            resource -> new ApiError(Errors.NONE, null)
+            resource -> ApiError.NONE
           case resourceType =>
             throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType")
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
index abf6bc0..83cdd67 100644
--- a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
+++ b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
@@ -48,7 +48,7 @@ class DelayedCreateTopics(delayMs: Long,
   override def tryComplete() : Boolean = {
     trace(s"Trying to complete operation for $createMetadata")
 
-    val leaderlessPartitionCount = createMetadata.filter(_.error.is(Errors.NONE))
+    val leaderlessPartitionCount = createMetadata.filter(_.error.isSuccess)
       .foldLeft(0) { case (topicCounter, metadata) =>
         topicCounter + missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet)
       }
@@ -69,7 +69,7 @@ class DelayedCreateTopics(delayMs: Long,
     trace(s"Completing operation for $createMetadata")
     val results = createMetadata.map { metadata =>
       // ignore topics that already have errors
-      if (metadata.error.is(Errors.NONE) && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0)
+      if (metadata.error.isSuccess && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0)
         (metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
       else
         (metadata.topic, metadata.error)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 337c740..a4fd30c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -51,7 +51,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.SaslHandshakeResponse
-import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 
 import scala.collection._
@@ -1763,22 +1763,19 @@ class KafkaApis(val requestChannel: RequestChannel,
       case None =>
         sendResponseMaybeThrottle(request, requestThrottleMs =>
           new DescribeAclsResponse(requestThrottleMs,
-            new SecurityDisabledException("No Authorizer is configured on the broker."),
-            Collections.emptySet()))
+            new ApiError(Errors.SECURITY_DISABLED, "No Authorizer is configured on the broker"), Collections.emptySet()))
       case Some(auth) =>
         val filter = describeAclsRequest.filter()
-        val returnedAcls = new util.ArrayList[AclBinding]
-        val aclMap = auth.getAcls()
-        aclMap.foreach { case (resource, acls) =>
-          acls.foreach { acl =>
-            val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name),
+        val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) =>
+          acls.flatMap { acl =>
+            val fixture = new AclBinding(new AdminResource(resource.resourceType.toJava, resource.name),
                 new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava))
-            if (filter.matches(fixture))
-              returnedAcls.add(fixture)
+            if (filter.matches(fixture)) Some(fixture)
+            else None
           }
         }
         sendResponseMaybeThrottle(request, requestThrottleMs =>
-          new DescribeAclsResponse(requestThrottleMs, null, returnedAcls))
+          new DescribeAclsResponse(requestThrottleMs, ApiError.NONE, returnedAcls.asJava))
     }
   }
 
@@ -1793,8 +1790,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       case Some(auth) =>
         val aclCreationResults = createAclsRequest.aclCreations.asScala.map { aclCreation =>
           SecurityUtils.convertToResourceAndAcl(aclCreation.acl.toFilter) match {
-            case Failure(throwable) => new AclCreationResponse(throwable)
-            case Success((resource, acl)) => try {
+            case Left(apiError) => new AclCreationResponse(apiError)
+            case Right((resource, acl)) => try {
                 if (resource.resourceType.equals(Cluster) &&
                     !resource.name.equals(Resource.ClusterResourceName))
                   throw new InvalidRequestException("The only valid name for the CLUSTER resource is " +
@@ -1805,11 +1802,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 
                 logger.debug(s"Added acl $acl to $resource")
 
-                new AclCreationResponse(null)
+                new AclCreationResponse(ApiError.NONE)
               } catch {
                 case throwable: Throwable =>
                   logger.debug(s"Failed to add acl $acl to $resource", throwable)
-                  new AclCreationResponse(throwable)
+                  new AclCreationResponse(ApiError.fromThrowable(throwable))
               }
           }
         }
@@ -1835,8 +1832,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           // Delete based on a list of ACL fixtures.
           for ((filter, i) <- filters.zipWithIndex) {
             SecurityUtils.convertToResourceAndAcl(filter) match {
-              case Failure(throwable) => filterResponseMap.put(i, new AclFilterResponse(throwable, Seq.empty.asJava))
-              case Success(fixture) => toDelete.put(i, ArrayBuffer(fixture))
+              case Left(apiError) => filterResponseMap.put(i, new AclFilterResponse(apiError, Seq.empty.asJava))
+              case Right(binding) => toDelete.put(i, ArrayBuffer(binding))
             }
           }
         } else {
@@ -1845,7 +1842,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           val filtersWithIndex = filters.zipWithIndex
           for ((resource, acls) <- aclMap; acl <- acls) {
             val binding = new AclBinding(
-              new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name),
+              new AdminResource(resource.resourceType.toJava, resource.name),
               new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
                 acl.permissionType.toJava))
 
@@ -1863,8 +1860,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               else None
             } catch {
               case throwable: Throwable =>
-                Some(new AclDeletionResult(new UnknownServerException(s"Failed to delete ACL $acl: $throwable"),
-                  aclBinding))
+                Some(new AclDeletionResult(ApiError.fromThrowable(throwable), aclBinding))
             }
           }.asJava
           

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index bd04c7b..09ff9be 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -14,7 +14,7 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import java.util
-import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
+import java.util.concurrent.ExecutionException
 import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 
@@ -22,14 +22,24 @@ import kafka.common.TopicAndPartition
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
+import kafka.admin.AdminUtils
+import kafka.log.LogConfig
+import kafka.network.SocketServer
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
-import CreateTopicsRequest.TopicDetails
+import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
+import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
+import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
+import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{Node, TopicPartition, requests}
 import org.junit.Assert._
@@ -38,13 +48,6 @@ import org.junit.{After, Assert, Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
-import org.apache.kafka.common.KafkaException
-import kafka.admin.AdminUtils
-import kafka.log.LogConfig
-import kafka.network.SocketServer
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord}
 
 class AuthorizerIntegrationTest extends BaseRequestTest {
 
@@ -72,6 +75,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
   val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
   val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
+  val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
+  val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
   val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)))
   val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
@@ -125,7 +130,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ADD_PARTITIONS_TO_TXN -> classOf[AddPartitionsToTxnResponse],
       ApiKeys.ADD_OFFSETS_TO_TXN -> classOf[AddOffsetsToTxnResponse],
       ApiKeys.END_TXN -> classOf[EndTxnResponse],
-      ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse]
+      ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse],
+      ApiKeys.CREATE_ACLS -> classOf[CreateAclsResponse],
+      ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse],
+      ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse]
   )
 
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@@ -156,7 +164,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)),
     ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => resp.error),
     ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error),
-    ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp))
+    ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)),
+    ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => resp.aclCreationResponses.asScala.head.error.error),
+    ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => resp.error.error),
+    ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error)
   )
 
   val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -185,7 +196,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.ADD_PARTITIONS_TO_TXN -> (topicWriteAcl ++ transactionIdWriteAcl),
     ApiKeys.ADD_OFFSETS_TO_TXN -> (groupReadAcl ++ transactionIdWriteAcl),
     ApiKeys.END_TXN -> transactionIdWriteAcl,
-    ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl)
+    ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl),
+    ApiKeys.CREATE_ACLS -> clusterAlterAcl,
+    ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl,
+    ApiKeys.DELETE_ACLS -> clusterAlterAcl
   )
 
   @Before
@@ -284,46 +298,47 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       build()
   }
 
-  private def createHeartbeatRequest = {
-    new HeartbeatRequest.Builder(group, 1, "").build()
-  }
+  private def heartbeatRequest = new HeartbeatRequest.Builder(group, 1, "").build()
 
-  private def createLeaveGroupRequest = {
-    new LeaveGroupRequest.Builder(group, "").build()
-  }
+  private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build()
 
-  private def createLeaderAndIsrRequest = {
+  private def leaderAndIsrRequest = {
     new requests.LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
       Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava,
       Set(new Node(brokerId, "localhost", 0)).asJava).build()
   }
 
-  private def createStopReplicaRequest = {
-    new requests.StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava).build()
-  }
+  private def stopReplicaRequest = new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava).build()
 
-  private def createControlledShutdownRequest = {
-    new requests.ControlledShutdownRequest.Builder(brokerId).build()
-  }
+  private def controlledShutdownRequest = new requests.ControlledShutdownRequest.Builder(brokerId).build()
 
-  private def createTopicsRequest = {
+  private def createTopicsRequest =
     new CreateTopicsRequest.Builder(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0).build()
-  }
 
-  private def deleteTopicsRequest = {
-    new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
-  }
+  private def deleteTopicsRequest = new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
 
-  private def createDescribeConfigsRequest =
+  private def describeConfigsRequest =
     new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC, tp.topic))).build()
 
-  private def createAlterConfigsRequest =
+  private def alterConfigsRequest =
     new AlterConfigsRequest.Builder(
       Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic),
         new AlterConfigsRequest.Config(Collections.singleton(
           new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000")
         ))), true).build()
 
+  private def describeAclsRequest = new DescribeAclsRequest.Builder(AclBindingFilter.ANY).build()
+
+  private def createAclsRequest = new CreateAclsRequest.Builder(
+    Collections.singletonList(new AclCreation(new AclBinding(
+      new AdminResource(AdminResourceType.TOPIC, "mytopic"),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))))).build()
+
+  private def deleteAclsRequest = new DeleteAclsRequest.Builder(
+    Collections.singletonList(new AclBindingFilter(
+      new ResourceFilter(AdminResourceType.TOPIC, null),
+      new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
+
 
   @Test
   def testAuthorizationWithTopicExisting() {
@@ -338,16 +353,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.JOIN_GROUP -> createJoinGroupRequest,
       ApiKeys.SYNC_GROUP -> createSyncGroupRequest,
       ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
-      ApiKeys.HEARTBEAT -> createHeartbeatRequest,
-      ApiKeys.LEAVE_GROUP -> createLeaveGroupRequest,
-      ApiKeys.LEADER_AND_ISR -> createLeaderAndIsrRequest,
-      ApiKeys.STOP_REPLICA -> createStopReplicaRequest,
-      ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest,
+      ApiKeys.HEARTBEAT -> heartbeatRequest,
+      ApiKeys.LEAVE_GROUP -> leaveGroupRequest,
+      ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest,
+      ApiKeys.STOP_REPLICA -> stopReplicaRequest,
+      ApiKeys.CONTROLLED_SHUTDOWN_KEY -> controlledShutdownRequest,
       ApiKeys.CREATE_TOPICS -> createTopicsRequest,
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
       ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
-      ApiKeys.DESCRIBE_CONFIGS -> createDescribeConfigsRequest,
-      ApiKeys.ALTER_CONFIGS -> createAlterConfigsRequest
+      ApiKeys.DESCRIBE_CONFIGS -> describeConfigsRequest,
+      ApiKeys.ALTER_CONFIGS -> alterConfigsRequest,
+      ApiKeys.CREATE_ACLS -> createAclsRequest,
+      ApiKeys.DELETE_ACLS -> deleteAclsRequest,
+      ApiKeys.DESCRIBE_ACLS -> describeAclsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index b4e09b3..03afc9e 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -31,13 +31,13 @@ import scala.util.{Failure, Success, Try}
 
 class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup {
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
-  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName())
+  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
 
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
 
   override def configureSecurityBeforeServersStart() {
-    val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName())
+    val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName)
     try {
       authorizer.configure(this.configs.head.originals())
       authorizer.addAcls(Set(new AuthAcl(AuthAcl.WildCardPrincipal, Allow,
@@ -92,6 +92,8 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
   val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
+  val transactionalIdAcl = new AclBinding(new Resource(ResourceType.TRANSACTIONAL_ID, "transactional_id"),
+    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))
 
   @Test
   override def testAclOperations(): Unit = {
@@ -116,28 +118,33 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     TestUtils.waitUntilTrue(() => {
       val results = client.describeAcls(filter).values.get()
       acls == results.asScala.toSet
-    }, "timed out waiting for ACLs")
+    }, s"timed out waiting for ACLs $acls")
   }
 
   @Test
   def testAclOperations2(): Unit = {
     client = AdminClient.create(createConfig())
-    val results = client.createAcls(List(acl2, acl2).asJava)
-    assertEquals(Set(acl2, acl2), results.values.keySet().asScala)
+    val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava)
+    assertEquals(Set(acl2, acl2, transactionalIdAcl), results.values.keySet.asScala)
     results.all.get()
     waitForDescribeAcls(client, acl2.toFilter, Set(acl2))
+    waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl))
 
     val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null), AccessControlEntryFilter.ANY)
     val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2"), AccessControlEntryFilter.ANY)
+    val filterC = new AclBindingFilter(new ResourceFilter(ResourceType.TRANSACTIONAL_ID, null), AccessControlEntryFilter.ANY)
 
     waitForDescribeAcls(client, filterA, Set())
+    waitForDescribeAcls(client, filterC, Set(transactionalIdAcl))
 
-    val results2 = client.deleteAcls(List(filterA, filterB).asJava, new DeleteAclsOptions())
-    assertEquals(Set(filterA, filterB), results2.values.keySet().asScala)
+    val results2 = client.deleteAcls(List(filterA, filterB, filterC).asJava, new DeleteAclsOptions())
+    assertEquals(Set(filterA, filterB, filterC), results2.values.keySet.asScala)
     assertEquals(Set(), results2.values.get(filterA).get.values.asScala.map(_.binding).toSet)
+    assertEquals(Set(transactionalIdAcl), results2.values.get(filterC).get.values.asScala.map(_.binding).toSet)
     assertEquals(Set(acl2), results2.values.get(filterB).get.values.asScala.map(_.binding).toSet)
 
     waitForDescribeAcls(client, filterB, Set())
+    waitForDescribeAcls(client, filterC, Set())
   }
 
   @Test
@@ -161,7 +168,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
 
   private def testAclCreateGetDelete(expectAuth: Boolean): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val result = client.createAcls(List(fooAcl).asJava, new CreateAclsOptions)
+      val result = client.createAcls(List(fooAcl, transactionalIdAcl).asJava, new CreateAclsOptions)
       if (expectAuth) {
         Try(result.all.get) match {
           case Failure(e) =>
@@ -180,9 +187,10 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     }, "timed out waiting for createAcls to " + (if (expectAuth) "succeed" else "fail"))
     if (expectAuth) {
       waitForDescribeAcls(client, fooAcl.toFilter, Set(fooAcl))
+      waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl))
     }
     TestUtils.waitUntilTrue(() => {
-      val result = client.deleteAcls(List(fooAcl.toFilter).asJava, new DeleteAclsOptions)
+      val result = client.deleteAcls(List(fooAcl.toFilter, transactionalIdAcl.toFilter).asJava, new DeleteAclsOptions)
       if (expectAuth) {
         Try(result.all.get) match {
           case Failure(e) =>
@@ -196,13 +204,17 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
             verifyCauseIsClusterAuth(e)
             true
           case Success(_) =>
+            assertEquals(Set(fooAcl, transactionalIdAcl), result.values.keySet)
             assertEquals(Set(fooAcl), result.values.get(fooAcl.toFilter).get.values.asScala.map(_.binding).toSet)
+            assertEquals(Set(transactionalIdAcl),
+              result.values.get(transactionalIdAcl.toFilter).get.values.asScala.map(_.binding).toSet)
             true
         }
       }
     }, "timed out waiting for deleteAcls to " + (if (expectAuth) "succeed" else "fail"))
     if (expectAuth) {
-      waitForDescribeAcls(client, fooAcl.toFilter, Set())
+      waitForDescribeAcls(client, fooAcl.toFilter, Set.empty)
+      waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set.empty)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 0ef3405..d89a9df 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -36,7 +36,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
   protected def validateValidCreateTopicsRequests(request: CreateTopicsRequest): Unit = {
     val response = sendCreateTopicRequest(request)
 
-    val error = response.errors.values.asScala.find(!_.is(Errors.NONE))
+    val error = response.errors.values.asScala.find(_.isFailure)
     assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty)
 
     request.topics.asScala.foreach { case (topic, details) =>
@@ -118,7 +118,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
         assertEquals(expected.messageWithFallback, actual.messageWithFallback)
       }
       // If no error validate topic exists
-      if (expectedError.is(Errors.NONE) && !request.validateOnly) {
+      if (expectedError.isSuccess && !request.validateOnly) {
         validateTopicExists(topic)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc6a3bc6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 2d47876..ebb3344 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -193,7 +193,7 @@ public class StreamsKafkaClient {
 
         for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) {
             ApiError error = createTopicsResponse.errors().get(internalTopicConfig.name());
-            if (!error.is(Errors.NONE) && !error.is(Errors.TOPIC_ALREADY_EXISTS)) {
+            if (error.isFailure() && !error.is(Errors.TOPIC_ALREADY_EXISTS)) {
                 throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + " due to " + error.messageWithFallback());
             }
         }