You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/22 01:36:04 UTC

[jira] [Commented] (KAFKA-3266) Implement KIP-140 RPCs and APIs for creating, altering, and listing ACLs

    [ https://issues.apache.org/jira/browse/KAFKA-3266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300822#comment-16300822 ] 

ASF GitHub Bot commented on KAFKA-3266:
---------------------------------------

ijuma closed pull request #1005: KAFKA-3266: List/Alter Acls - protocol and server side implemen…
URL: https://github.com/apache/kafka/pull/1005
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 051c8d13ef6..88cb38ab17a 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -85,6 +85,7 @@
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.network" />
       <allow pkg="org.apache.kafka.common.record" />
+      <allow pkg="org.apache.kafka.common.requests" />
       <!-- for testing -->
       <allow pkg="org.apache.kafka.common.errors" />
     </subpackage>
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index e8fd3d3215e..bd2959beddc 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -36,7 +36,9 @@
     LEAVE_GROUP(13, "LeaveGroup"),
     SYNC_GROUP(14, "SyncGroup"),
     DESCRIBE_GROUPS(15, "DescribeGroups"),
-    LIST_GROUPS(16, "ListGroups");
+    LIST_GROUPS(16, "ListGroups"),
+    LIST_ACLS(17, "ListAcls"),
+    ALTER_ACLS(18, "AlterAcls");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 43110b51528..70c606049b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -718,6 +718,61 @@
     public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2};
     public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2};
 
+    /* Common acls api */
+    public static final Schema ACLS_RESOURCE_V0 = new Schema(
+        new Field("resource_type", INT8, "The id of the resource type"),
+        new Field("resource_name", STRING));
+
+    public static final Schema ACLS_ACL_V0 = new Schema(
+        new Field("acl_principle", STRING),
+        new Field("acl_permission_type", INT8, "The id of the permission type"),
+        new Field("acl_host", STRING),
+        new Field("acl_operation", INT8, "The id of the operation"));
+
+    /* List acls api */
+    public static final Schema LIST_ACLS_REQUEST_V0 = new Schema(
+        new Field("principal", NULLABLE_STRING, "The principle to list acls for. null indicates all"),
+        new Field("resource", ACLS_RESOURCE_V0, "The resoure to list acls for. A resource_type of -1 indicates all."));
+
+    public static final Schema LIST_ACLS_RESPONSE_RESOURCE_ACLS_V0 = new Schema(
+        new Field("resource", ACLS_RESOURCE_V0),
+        new Field("acls", new ArrayOf(ACLS_ACL_V0)));
+
+    public static final Schema LIST_ACLS_RESPONSE_V0 = new Schema(
+        new Field("responses", new ArrayOf(LIST_ACLS_RESPONSE_RESOURCE_ACLS_V0)),
+        new Field("error_code", INT16));
+
+    public static final Schema[] LIST_ACLS_REQUEST = new Schema[] {LIST_ACLS_REQUEST_V0};
+    public static final Schema[] LIST_ACLS_RESPONSE = new Schema[] {LIST_ACLS_RESPONSE_V0};
+
+    /* Alter acls api */
+    public static final Schema ALTER_ACLS_REQUEST_ACTION_ENTRY_V0 = new Schema(
+        new Field("action", INT8, "The id of the action to take"),
+        new Field("acl", ACLS_ACL_V0));
+
+    public static final Schema ALTER_ACLS_REQUEST_RESOURCE_ENTRY_V0 = new Schema(
+        new Field("resource", ACLS_RESOURCE_V0),
+        new Field("actions", new ArrayOf(ALTER_ACLS_REQUEST_ACTION_ENTRY_V0)));
+
+    public static final Schema ALTER_ACLS_REQUEST_V0 = new Schema(
+        new Field("requests", new ArrayOf(ALTER_ACLS_REQUEST_RESOURCE_ENTRY_V0)));
+
+
+    public static final Schema ALTER_ACLS_RESPONSE_ACTION_ENTRY_V0 = new Schema(
+        new Field("action", INT8, "The id of the action to taken"),
+        new Field("acl", ACLS_ACL_V0),
+        new Field("error_code", INT16));
+
+    public static final Schema ALTER_ACLS_RESPONSE_RESOURCE_ENTRY_V0 = new Schema(
+        new Field("resource", ACLS_RESOURCE_V0),
+        new Field("results", new ArrayOf(ALTER_ACLS_RESPONSE_ACTION_ENTRY_V0)));
+
+    public static final Schema ALTER_ACLS_RESPONSE_V0 = new Schema(
+        new Field("responses", new ArrayOf(ALTER_ACLS_RESPONSE_RESOURCE_ENTRY_V0)));
+
+    public static final Schema[] ALTER_ACLS_REQUEST = new Schema[] {ALTER_ACLS_REQUEST_V0};
+    public static final Schema[] ALTER_ACLS_RESPONSE = new Schema[] {ALTER_ACLS_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -744,6 +799,8 @@
         REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST;
         REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST;
         REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST;
+        REQUESTS[ApiKeys.LIST_ACLS.id] = LIST_ACLS_REQUEST;
+        REQUESTS[ApiKeys.ALTER_ACLS.id] = ALTER_ACLS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -762,6 +819,8 @@
         RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE;
         RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE;
         RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE;
+        RESPONSES[ApiKeys.LIST_ACLS.id] = LIST_ACLS_RESPONSE;
+        RESPONSES[ApiKeys.ALTER_ACLS.id] = ALTER_ACLS_RESPONSE;
 
         /* set the maximum version of each api */
         for (ApiKeys api : ApiKeys.values())
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 5a40b7fe214..fa3f1d01c5b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -72,6 +72,10 @@ public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffe
                 return DescribeGroupsRequest.parse(buffer, versionId);
             case LIST_GROUPS:
                 return ListGroupsRequest.parse(buffer, versionId);
+            case LIST_ACLS:
+                return ListAclsRequest.parse(buffer, versionId);
+            case ALTER_ACLS:
+                return AlterAclsRequest.parse(buffer, versionId);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterAclsRequest.java
new file mode 100644
index 00000000000..f7d85080225
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterAclsRequest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AlterAclsResponse.ActionResponse;
+import org.apache.kafka.common.security.auth.Acl;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.Operation;
+import org.apache.kafka.common.security.auth.PermissionType;
+import org.apache.kafka.common.security.auth.Resource;
+import org.apache.kafka.common.security.auth.ResourceType;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlterAclsRequest extends AbstractRequest {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.ALTER_ACLS.id);
+
+    public static final String REQUESTS_KEY_NAME = "requests";
+
+    public static final String RESOURCE_KEY_NAME = "resource";
+    public static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+    public static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+
+    public static final String ACTIONS_KEY_NAME = "actions";
+    public static final String ACTION_KEY_NAME = "action";
+
+    public static final String ACL_KEY_NAME = "acl";
+    public static final String ACL_PRINCIPLE_KEY_NAME = "acl_principle";
+    public static final String ACL_PERMISSION_TYPE_KEY_NAME = "acl_permission_type";
+    public static final String ACL_HOST_KEY_NAME = "acl_host";
+    public static final String ACL_OPERATION_KEY_NAME = "acl_operation";
+
+    public static enum Action {
+        DELETE((byte) 0, "Delete"),
+        ADD((byte) 1, "Add");
+
+        private static Action[] idToAction;
+        private static Map<String, Action> nameToAction;
+        public static final int MAX_ID;
+
+        static {
+            int maxId = -1;
+            for (ResourceType key : ResourceType.values()) {
+                maxId = Math.max(maxId, key.id);
+            }
+            idToAction = new Action[maxId + 1];
+            nameToAction = new HashMap<>();
+            for (Action key : Action.values()) {
+                idToAction[key.id] = key;
+                nameToAction.put(key.name.toUpperCase(), key);
+            }
+            MAX_ID = maxId;
+        }
+
+        public final byte id;
+        public final String name;
+
+        private Action(byte id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /** Case insensitive lookup by name */
+        public static Action forName(String name) {
+            Action action = nameToAction.get(name.toUpperCase());
+            if (action == null) {
+                throw new IllegalArgumentException(String.format("No enum constant with name %s", name));
+            }
+            return action;
+        }
+
+        public static Action forId(byte id) {
+            return idToAction[id];
+        }
+    }
+
+    public static final class ActionRequest {
+        public final Action action;
+        public final Acl acl;
+
+        public ActionRequest(Action action, Acl acl) {
+            this.action = action;
+            this.acl = acl;
+        }
+    }
+
+    /**
+     * Possible error codes:
+     *
+     * CLUSTER_AUTHORIZATION_FAILED(31)
+     */
+
+    private final Map<Resource, List<ActionRequest>> requests;
+
+    public AlterAclsRequest(Map<Resource, List<ActionRequest>> requests) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        List<Struct> requestsList = new ArrayList<>();
+        for (Map.Entry<Resource, List<ActionRequest>> request : requests.entrySet()) {
+            Struct requestStruct = struct.instance(REQUESTS_KEY_NAME);
+
+            Struct resource = requestStruct.instance(RESOURCE_KEY_NAME);
+            resource.set(RESOURCE_TYPE_KEY_NAME, request.getKey().getResourceType().id);
+            resource.set(RESOURCE_NAME_KEY_NAME, request.getKey().getName());
+
+            List<Struct> actionRequests = new ArrayList<>();
+            for (ActionRequest actionRequest : request.getValue()) {
+                Struct actionRequestStruct = requestStruct.instance(ACTIONS_KEY_NAME);
+
+                actionRequestStruct.set(ACTION_KEY_NAME, actionRequest.action.id);
+
+                Struct aclStruct = actionRequestStruct.instance(ACL_KEY_NAME);
+                aclStruct.set(ACL_PRINCIPLE_KEY_NAME, actionRequest.acl.getPrincipal().toString());
+                aclStruct.set(ACL_PERMISSION_TYPE_KEY_NAME, actionRequest.acl.getPermissionType().id);
+                aclStruct.set(ACL_HOST_KEY_NAME, actionRequest.acl.getHost());
+                aclStruct.set(ACL_OPERATION_KEY_NAME, actionRequest.acl.getOperation().id);
+
+                actionRequestStruct.set(ACL_KEY_NAME, aclStruct);
+
+                actionRequests.add(actionRequestStruct);
+            }
+
+            requestStruct.set(RESOURCE_KEY_NAME, resource);
+            requestStruct.set(ACTIONS_KEY_NAME, actionRequests.toArray());
+
+            requestsList.add(requestStruct);
+        }
+
+        struct.set(REQUESTS_KEY_NAME, requestsList.toArray());
+
+        this.requests = requests;
+    }
+
+    public AlterAclsRequest(Struct struct) {
+        super(struct);
+
+        Map<Resource, List<ActionRequest>> requests = new HashMap<>();
+
+        Object[] requestsArray = (Object[]) struct.get(REQUESTS_KEY_NAME);
+        for (Object requestObj : requestsArray) {
+            Struct request = (Struct) requestObj;
+
+            Struct resourceStruct = request.getStruct(RESOURCE_KEY_NAME);
+            ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
+            Resource resource = new Resource(resourceType, resourceName);
+
+            List<ActionRequest> actionRequests = new ArrayList<>();
+            Object[] actionRequestArray = (Object[]) request.get(ACTIONS_KEY_NAME);
+            for (Object actionRequestObj : actionRequestArray) {
+                Struct actionRequestStruct = (Struct) actionRequestObj;
+
+                Action action = Action.forId(actionRequestStruct.getByte(ACTION_KEY_NAME));
+
+                Struct aclStruct = actionRequestStruct.getStruct(ACL_KEY_NAME);
+                KafkaPrincipal principal = KafkaPrincipal.fromString(aclStruct.getString(ACL_PRINCIPLE_KEY_NAME));
+                PermissionType permissionType = PermissionType.forId(aclStruct.getByte(ACL_PERMISSION_TYPE_KEY_NAME));
+                String host = aclStruct.getString(ACL_HOST_KEY_NAME);
+                Operation operation = Operation.forId(aclStruct.getByte(ACL_OPERATION_KEY_NAME));
+                Acl acl = new Acl(principal, permissionType, host, operation);
+
+                actionRequests.add(new ActionRequest(action, acl));
+            }
+            requests.put(resource, actionRequests);
+        }
+
+        this.requests = requests;
+    }
+
+    public Map<Resource, List<ActionRequest>> requests() {
+        return this.requests;
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        Errors error = Errors.forException(e);
+
+        Map<Resource, List<ActionResponse>> results = new HashMap<>();
+        for (Map.Entry<Resource, List<ActionRequest>> request : requests.entrySet()) {
+            List<ActionResponse> actionResults = new ArrayList<>();
+            for (ActionRequest actionRequest : request.getValue()) {
+                actionResults.add(new ActionResponse(actionRequest.action, actionRequest.acl, error));
+            }
+            results.put(request.getKey(), actionResults);
+        }
+
+        switch (versionId) {
+            case 0:
+                return new AlterAclsResponse(results);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                    versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.ALTER_ACLS.id)));
+        }
+    }
+
+    public static AlterAclsRequest parse(ByteBuffer buffer, int versionId) {
+        return new AlterAclsRequest(ProtoUtils.parseRequest(ApiKeys.ALTER_ACLS.id, versionId, buffer));
+    }
+
+    public static AlterAclsRequest parse(ByteBuffer buffer) {
+        return new AlterAclsRequest(CURRENT_SCHEMA.read(buffer));
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterAclsResponse.java
new file mode 100644
index 00000000000..113cdcf4741
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterAclsResponse.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AlterAclsRequest.Action;
+import org.apache.kafka.common.security.auth.Acl;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.Operation;
+import org.apache.kafka.common.security.auth.PermissionType;
+import org.apache.kafka.common.security.auth.Resource;
+import org.apache.kafka.common.security.auth.ResourceType;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlterAclsResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.ALTER_ACLS.id);
+
+    public static final String RESPONSES_KEY_NAME = "responses";
+
+    public static final String RESOURCE_KEY_NAME = "resource";
+    public static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+    public static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+
+    public static final String RESULTS_KEY_NAME = "results";
+    public static final String ACTION_KEY_NAME = "action";
+
+    public static final String ACL_KEY_NAME = "acl";
+    public static final String ACL_PRINCIPLE_KEY_NAME = "acl_principle";
+    public static final String ACL_PERMISSION_TYPE_KEY_NAME = "acl_permission_type";
+    public static final String ACL_HOST_KEY_NAME = "acl_host";
+    public static final String ACL_OPERATION_KEY_NAME = "acl_operation";
+
+    public static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    public static final class ActionResponse {
+        public final Action action;
+        public final Acl acl;
+        public final Errors error;
+
+        public ActionResponse(Action action, Acl acl, Errors error) {
+            this.action = action;
+            this.acl = acl;
+            this.error = error;
+        }
+    }
+
+    /**
+     * Possible error codes:
+     *
+     * CLUSTER_AUTHORIZATION_FAILED(31)
+     */
+
+    private final Map<Resource, List<ActionResponse>> results;
+
+    public AlterAclsResponse(Map<Resource, List<ActionResponse>> results) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        List<Struct> responsesArray = new ArrayList<>();
+        for (Map.Entry<Resource, List<ActionResponse>> result : results.entrySet()) {
+            Struct response = struct.instance(RESPONSES_KEY_NAME);
+
+            Struct resource = response.instance(RESOURCE_KEY_NAME);
+            resource.set(RESOURCE_TYPE_KEY_NAME, result.getKey().getResourceType().id);
+            resource.set(RESOURCE_NAME_KEY_NAME, result.getKey().getName());
+
+            List<Struct> actionResults = new ArrayList<>();
+            for (ActionResponse actionResult : result.getValue()) {
+                Struct actionResultStruct = response.instance(RESULTS_KEY_NAME);
+
+                actionResultStruct.set(ACTION_KEY_NAME, actionResult.action.id);
+
+                Struct aclStruct = actionResultStruct.instance(ACL_KEY_NAME);
+                aclStruct.set(ACL_PRINCIPLE_KEY_NAME, actionResult.acl.getPrincipal().toString());
+                aclStruct.set(ACL_PERMISSION_TYPE_KEY_NAME, actionResult.acl.getPermissionType().id);
+                aclStruct.set(ACL_HOST_KEY_NAME, actionResult.acl.getHost());
+                aclStruct.set(ACL_OPERATION_KEY_NAME, actionResult.acl.getOperation().id);
+
+                actionResultStruct.set(ACL_KEY_NAME, aclStruct);
+
+                actionResultStruct.set(ERROR_CODE_KEY_NAME, actionResult.error.code());
+
+                actionResults.add(actionResultStruct);
+            }
+
+            response.set(RESOURCE_KEY_NAME, resource);
+            response.set(RESULTS_KEY_NAME, actionResults.toArray());
+
+            responsesArray.add(response);
+        }
+
+        struct.set(RESPONSES_KEY_NAME, responsesArray.toArray());
+
+        this.results = results;
+    }
+
+    public AlterAclsResponse(Struct struct) {
+        super(struct);
+
+        Map<Resource, List<ActionResponse>> results = new HashMap<>();
+
+        Object[] responses = (Object[]) struct.get(RESPONSES_KEY_NAME);
+        for (Object responseObj : responses) {
+            Struct response = (Struct) responseObj;
+
+            Struct resourceStruct = response.getStruct(RESOURCE_KEY_NAME);
+            ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
+            Resource resource = new Resource(resourceType, resourceName);
+
+            List<ActionResponse> actionResults = new ArrayList<>();
+            Object[] actionResultsArray = (Object[]) response.get(RESULTS_KEY_NAME);
+            for (Object actionResultObj : actionResultsArray) {
+                Struct actionResultStruct = (Struct) actionResultObj;
+
+                Action action = Action.forId(actionResultStruct.getByte(ACTION_KEY_NAME));
+
+                Struct aclStruct = actionResultStruct.getStruct(ACL_KEY_NAME);
+                KafkaPrincipal principal = KafkaPrincipal.fromString(aclStruct.getString(ACL_PRINCIPLE_KEY_NAME));
+                PermissionType permissionType = PermissionType.forId(aclStruct.getByte(ACL_PERMISSION_TYPE_KEY_NAME));
+                String host = aclStruct.getString(ACL_HOST_KEY_NAME);
+                Operation operation = Operation.forId(aclStruct.getByte(ACL_OPERATION_KEY_NAME));
+                Acl acl = new Acl(principal, permissionType, host, operation);
+
+                Errors error = Errors.forCode(actionResultStruct.getShort(ERROR_CODE_KEY_NAME));
+
+                actionResults.add(new ActionResponse(action, acl, error));
+            }
+            results.put(resource, actionResults);
+        }
+
+        this.results = results;
+    }
+
+    public Map<Resource, List<ActionResponse>> results() {
+        return this.results;
+    }
+
+    public static AlterAclsResponse parse(ByteBuffer buffer) {
+        return new AlterAclsResponse(CURRENT_SCHEMA.read(buffer));
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListAclsRequest.java
new file mode 100644
index 00000000000..469daceeec6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListAclsRequest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.Acl;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.Resource;
+import org.apache.kafka.common.security.auth.ResourceType;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Set;
+
+public class ListAclsRequest extends AbstractRequest {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_ACLS.id);
+
+    public static final String PRINCIPLE_KEY_NAME = "principal";
+
+    public static final String RESOURCE_KEY_NAME = "resource";
+    public static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+    public static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+
+    public static final Byte ALL_RESOURCE_TYPE_ID = -1;
+
+    private final KafkaPrincipal principal;
+    private final Resource resource;
+
+    public ListAclsRequest() {
+        this(null, null);
+    }
+
+    public ListAclsRequest(KafkaPrincipal principal) {
+        this(principal, null);
+    }
+
+    public ListAclsRequest(Resource resource) {
+        this(null, resource);
+    }
+
+    public ListAclsRequest(KafkaPrincipal principal, Resource resource) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        if (principal == null) {
+            struct.set(PRINCIPLE_KEY_NAME, null);
+        } else {
+            struct.set(PRINCIPLE_KEY_NAME, principal.toString());
+        }
+
+        Struct resourceStruct = struct.instance(RESOURCE_KEY_NAME);
+        if (resource == null) {
+            resourceStruct.set(RESOURCE_TYPE_KEY_NAME, ALL_RESOURCE_TYPE_ID);
+            resourceStruct.set(RESOURCE_NAME_KEY_NAME, "");
+        } else {
+            resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.getResourceType().id);
+            resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.getName());
+        }
+        struct.set(RESOURCE_KEY_NAME, resourceStruct);
+
+        this.principal = principal;
+        this.resource = resource;
+    }
+
+    public ListAclsRequest(Struct struct) {
+        super(struct);
+
+        String principleStr = struct.getString(PRINCIPLE_KEY_NAME);
+        if (principleStr == null) {
+            this.principal = null;
+        } else {
+            this.principal = KafkaPrincipal.fromString(struct.getString(PRINCIPLE_KEY_NAME));
+        }
+
+        Struct resourceStruct = struct.getStruct(RESOURCE_KEY_NAME);
+        byte resourceTypeId = resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME);
+        if (resourceTypeId == ALL_RESOURCE_TYPE_ID) {
+            this.resource = null;
+        } else {
+            ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
+            this.resource = new Resource(resourceType, resourceName);
+        }
+    }
+
+    public boolean hasPrincipal() {
+        return principal != null;
+    }
+
+    public KafkaPrincipal getPrincipal() {
+        return principal;
+    }
+
+    public boolean hasResource() {
+        return resource != null;
+    }
+
+    public Resource getResource() {
+        return resource;
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return new ListAclsResponse(Collections.<Resource, Set<Acl>>emptyMap(), Errors.forException(e));
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                    versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_ACLS.id)));
+        }
+    }
+
+    public static ListAclsRequest parse(ByteBuffer buffer, int versionId) {
+        return new ListAclsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_ACLS.id, versionId, buffer));
+    }
+
+    public static ListAclsRequest parse(ByteBuffer buffer) {
+        return new ListAclsRequest(CURRENT_SCHEMA.read(buffer));
+    }
+
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListAclsResponse.java
new file mode 100644
index 00000000000..91ab6e71dad
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListAclsResponse.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.Acl;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.Operation;
+import org.apache.kafka.common.security.auth.PermissionType;
+import org.apache.kafka.common.security.auth.Resource;
+import org.apache.kafka.common.security.auth.ResourceType;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ListAclsResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_ACLS.id);
+
+    public static final String RESPONSES_KEY_NAME = "responses";
+    public static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    public static final String RESOURCE_KEY_NAME = "resource";
+    public static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+    public static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+
+    public static final String ACLS_KEY_NAME = "acls";
+    public static final String ACL_PRINCIPLE_KEY_NAME = "acl_principle";
+    public static final String ACL_PERMISSION_TYPE_KEY_NAME = "acl_permission_type";
+    public static final String ACL_HOST_KEY_NAME = "acl_host";
+    public static final String ACL_OPERATION_KEY_NAME = "acl_operation";
+
+    /**
+     * Possible error codes:
+     *
+     * CLUSTER_AUTHORIZATION_FAILED(31)
+     */
+
+    private final Map<Resource, Set<Acl>> acls;
+    private final Errors error;
+
+    public ListAclsResponse(Map<Resource, Set<Acl>> acls, Errors error) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        List<Struct> responses = new ArrayList<>();
+        for (Map.Entry<Resource, Set<Acl>> aclEntry : acls.entrySet()) {
+            Struct response = struct.instance(RESPONSES_KEY_NAME);
+
+            Struct resource = response.instance(RESOURCE_KEY_NAME);
+            resource.set(RESOURCE_TYPE_KEY_NAME, aclEntry.getKey().getResourceType().id);
+            resource.set(RESOURCE_NAME_KEY_NAME, aclEntry.getKey().getName());
+
+            Set<Struct> aclsSet = new HashSet<>();
+            for (Acl acl : aclEntry.getValue()) {
+                Struct aclStruct = response.instance(ACLS_KEY_NAME);
+                aclStruct.set(ACL_PRINCIPLE_KEY_NAME, acl.getPrincipal().toString());
+                aclStruct.set(ACL_PERMISSION_TYPE_KEY_NAME, acl.getPermissionType().id);
+                aclStruct.set(ACL_HOST_KEY_NAME, acl.getHost());
+                aclStruct.set(ACL_OPERATION_KEY_NAME, acl.getOperation().id);
+
+                aclsSet.add(aclStruct);
+            }
+
+            response.set(RESOURCE_KEY_NAME, resource);
+            response.set(ACLS_KEY_NAME, aclsSet.toArray());
+
+            responses.add(response);
+        }
+
+        struct.set(RESPONSES_KEY_NAME, responses.toArray());
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+
+        this.acls = acls;
+        this.error = error;
+    }
+
+    public ListAclsResponse(Struct struct) {
+        super(struct);
+
+        Map<Resource, Set<Acl>> acls = new HashMap<>();
+
+        Object[] responses = (Object[]) struct.get(RESPONSES_KEY_NAME);
+        for (Object responseObj : responses) {
+            Struct response = (Struct) responseObj;
+
+            Struct resourceStruct = response.getStruct(RESOURCE_KEY_NAME);
+            ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
+            Resource resource = new Resource(resourceType, resourceName);
+
+            Set<Acl> aclList = new HashSet<>();
+            Object[] aclArray = (Object[]) response.get(ACLS_KEY_NAME);
+            for (Object aclObj : aclArray) {
+                Struct aclStruct = (Struct) aclObj;
+                KafkaPrincipal principal = KafkaPrincipal.fromString(aclStruct.getString(ACL_PRINCIPLE_KEY_NAME));
+                PermissionType permissionType = PermissionType.forId(aclStruct.getByte(ACL_PERMISSION_TYPE_KEY_NAME));
+                String host = aclStruct.getString(ACL_HOST_KEY_NAME);
+                Operation operation = Operation.forId(aclStruct.getByte(ACL_OPERATION_KEY_NAME));
+                Acl acl = new Acl(principal, permissionType, host, operation);
+
+                aclList.add(acl);
+            }
+            acls.put(resource, aclList);
+        }
+
+        this.acls = acls;
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+    }
+
+    public Errors error() {
+        return this.error;
+    }
+
+    public Map<Resource, Set<Acl>> acls() {
+        return this.acls;
+    }
+
+    public static ListAclsResponse parse(ByteBuffer buffer) {
+        return new ListAclsResponse(CURRENT_SCHEMA.read(buffer));
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/Acl.java b/clients/src/main/java/org/apache/kafka/common/security/auth/Acl.java
new file mode 100644
index 00000000000..a6f53a6f9a6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/Acl.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+public class Acl {
+    public static final String WILDCARD_HOST = "*";
+
+    private KafkaPrincipal principal;
+    private PermissionType permissionType;
+    private String host;
+    private Operation operation;
+
+    public Acl(KafkaPrincipal principal, PermissionType permissionType, String host, Operation operation) {
+        if (principal == null || permissionType == null || host == null || operation == null) {
+            throw new IllegalArgumentException("principal, permissionType, host or operation can not be null");
+        }
+        this.principal = principal;
+        this.permissionType = permissionType;
+        this.host = host;
+        this.operation = operation;
+    }
+
+    public KafkaPrincipal getPrincipal() {
+        return principal;
+    }
+
+    public PermissionType getPermissionType() {
+        return permissionType;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public Operation getOperation() {
+        return operation;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Acl acl = (Acl) o;
+
+        if (!principal.equals(acl.principal)) return false;
+        if (permissionType != acl.permissionType) return false;
+        if (!host.equals(acl.host)) return false;
+        return operation == acl.operation;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = principal.hashCode();
+        result = 31 * result + permissionType.hashCode();
+        result = 31 * result + host.hashCode();
+        result = 31 * result + operation.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s has %s permission for operations: %s from hosts: %s", principal, permissionType.name, operation.name, host);
+    }
+
+}
+
+
+
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/Operation.java b/clients/src/main/java/org/apache/kafka/common/security/auth/Operation.java
new file mode 100644
index 00000000000..68517548ca8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/Operation.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public enum Operation {
+    READ((byte) 0, "Read"),
+    WRITE((byte) 1, "Write"),
+    CREATE((byte) 2, "Create"),
+    DELETE((byte) 3, "Delete"),
+    ALTER((byte) 4, "Alter"),
+    DESCRIBE((byte) 5, "Describe"),
+    CLUSTER_ACTION((byte) 6, "ClusterAction"),
+    ALL((byte) 7, "All");
+
+    private static Operation[] idToOperation;
+    private static Map<String, Operation> nameToOperation;
+    public static final int MAX_ID;
+
+    static {
+        int maxId = -1;
+        for (Operation key : Operation.values()) {
+            maxId = Math.max(maxId, key.id);
+        }
+        idToOperation = new Operation[maxId + 1];
+        nameToOperation = new HashMap<>();
+        for (Operation key : Operation.values()) {
+            idToOperation[key.id] = key;
+            nameToOperation.put(key.name.toUpperCase(), key);
+        }
+        MAX_ID = maxId;
+    }
+
+    public final byte id;
+    public final String name;
+
+    private Operation(byte id, String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    /** Case insensitive lookup by name */
+    public static Operation forName(String name) {
+        Operation operation = nameToOperation.get(name.toUpperCase());
+        if (operation == null) {
+            throw new IllegalArgumentException(String.format("No enum constant with name %s", name));
+        }
+        return operation;
+    }
+
+    public static Operation forId(byte id) {
+        return idToOperation[id];
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PermissionType.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PermissionType.java
new file mode 100644
index 00000000000..efddda77369
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PermissionType.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public enum PermissionType {
+    ALLOW((byte) 0, "Allow"),
+    DENY((byte) 1, "Deny");
+
+    private static PermissionType[] idToType;
+    private static Map<String, PermissionType> nameToPermissionType;
+    public static final int MAX_ID;
+
+    static {
+        int maxId = -1;
+        for (PermissionType key : PermissionType.values()) {
+            maxId = Math.max(maxId, key.id);
+        }
+        idToType = new PermissionType[maxId + 1];
+        nameToPermissionType = new HashMap<>();
+        for (PermissionType key : PermissionType.values()) {
+            idToType[key.id] = key;
+            nameToPermissionType.put(key.name.toUpperCase(), key);
+        }
+        MAX_ID = maxId;
+    }
+
+    public final byte id;
+    public final String name;
+
+    private PermissionType(byte id, String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    /** Case insensitive lookup by name */
+    public static PermissionType forName(String name) {
+        PermissionType permissionType = nameToPermissionType.get(name.toUpperCase());
+        if (permissionType == null) {
+            throw new IllegalArgumentException(String.format("No enum constant with name %s", name));
+        }
+        return permissionType;
+    }
+
+    public static PermissionType forId(byte id) {
+        return idToType[id];
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/Resource.java b/clients/src/main/java/org/apache/kafka/common/security/auth/Resource.java
new file mode 100644
index 00000000000..476fef3d530
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/Resource.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+public class Resource {
+    public static final String SEPARATOR = ":";
+    public static final String WILDCARD_RESOURCE = "*";
+    public static final String CLUSTER_RESOURCE_NAME = "kafka-cluster";
+    public static final Resource CLUSTER_RESOURCE = new Resource(ResourceType.CLUSTER, CLUSTER_RESOURCE_NAME);
+
+    private ResourceType resourceType;
+    private String name;
+
+    public Resource(ResourceType resourceType, String name) {
+        if (resourceType == null || name == null) {
+            throw new IllegalArgumentException("resourceType and name can not be null");
+        }
+        this.resourceType = resourceType;
+        this.name = name;
+    }
+
+    public static Resource fromString(String str) {
+        if (str == null || str.isEmpty()) {
+            throw new IllegalArgumentException("expected a string in format resourceType:resourceName but got " + str);
+        }
+
+        String[] split = str.split(SEPARATOR, 2);
+
+        if (split == null || split.length != 2) {
+            throw new IllegalArgumentException("expected a string in format principalType:resourceName but got " + str);
+        }
+
+        ResourceType resourceType = ResourceType.forName(split[0]);
+        return new Resource(resourceType, split[1]);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public ResourceType getResourceType() {
+        return resourceType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Resource resource = (Resource) o;
+
+        if (resourceType != resource.resourceType) return false;
+        return name.equals(resource.name);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = resourceType.hashCode();
+        result = 31 * result + name.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return resourceType.name + SEPARATOR + name;
+    }
+}
+
+
+
diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/security/auth/ResourceType.java
new file mode 100644
index 00000000000..1e111fd5214
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/auth/ResourceType.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public enum ResourceType {
+    CLUSTER((byte) 0, "Cluster"),
+    TOPIC((byte) 1, "Topic"),
+    GROUP((byte) 2, "Group");
+
+    private static ResourceType[] idToType;
+    private static Map<String, ResourceType> nameToResourceType;
+    public static final int MAX_ID;
+
+    static {
+        int maxId = -1;
+        for (ResourceType key : ResourceType.values()) {
+            maxId = Math.max(maxId, key.id);
+        }
+        idToType = new ResourceType[maxId + 1];
+        nameToResourceType = new HashMap<>();
+        for (ResourceType key : ResourceType.values()) {
+            idToType[key.id] = key;
+            nameToResourceType.put(key.name.toUpperCase(), key);
+        }
+        MAX_ID = maxId;
+    }
+
+    public final byte id;
+    public final String name;
+
+    private ResourceType(byte id, String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    /** Case insensitive lookup by name */
+    public static ResourceType forName(String name) {
+        ResourceType resourceType = nameToResourceType.get(name.toUpperCase());
+        if (resourceType == null) {
+            throw new IllegalArgumentException(String.format("No enum constant with name %s", name));
+        }
+        return resourceType;
+    }
+
+    public static ResourceType forId(byte id) {
+        return idToType[id];
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9def5577a5b..23c44659e68 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -21,6 +21,15 @@
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AlterAclsRequest.Action;
+import org.apache.kafka.common.requests.AlterAclsRequest.ActionRequest;
+import org.apache.kafka.common.requests.AlterAclsResponse.ActionResponse;
+import org.apache.kafka.common.security.auth.Acl;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.Operation;
+import org.apache.kafka.common.security.auth.PermissionType;
+import org.apache.kafka.common.security.auth.Resource;
+import org.apache.kafka.common.security.auth.ResourceType;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -91,7 +100,12 @@ public void testSerialization() throws Exception {
                 createUpdateMetadataResponse(),
                 createLeaderAndIsrRequest(),
                 createLeaderAndIsrRequest().getErrorResponse(0, new UnknownServerException()),
-                createLeaderAndIsrResponse()
+                createLeaderAndIsrResponse(),
+                createListAclsRequest(null, null),
+                createListAclsRequest(KafkaPrincipal.ANONYMOUS, Resource.CLUSTER_RESOURCE),
+                createListAclsResponse(),
+                createAlterAclsRequest(),
+                createAlterAclsResponse()
         );
 
         for (AbstractRequestResponse req : requestResponseList)
@@ -425,5 +439,32 @@ private AbstractRequestResponse createUpdateMetadataResponse() {
         return new UpdateMetadataResponse(Errors.NONE.code());
     }
 
+    private AbstractRequest createListAclsRequest(KafkaPrincipal principal, Resource resource) {
+        return new ListAclsRequest(principal, resource);
+    }
+
+    private AbstractRequestResponse createListAclsResponse() {
+        Map<Resource, Set<Acl>> acls = new HashMap<>();
+        Set<Acl> aclSet = new HashSet<>();
+        aclSet.add(new Acl(KafkaPrincipal.ANONYMOUS, PermissionType.ALLOW, "*", Operation.ALL));
+        acls.put(new Resource(ResourceType.TOPIC, "topic"), aclSet);
+
+        return new ListAclsResponse(acls, Errors.NONE);
+    }
 
+    private AbstractRequest createAlterAclsRequest() {
+        Map<Resource, List<ActionRequest>> requests = new HashMap<>();
+        List<ActionRequest> actions = new ArrayList<>();
+        actions.add(new ActionRequest(Action.ADD, new Acl(KafkaPrincipal.ANONYMOUS, PermissionType.ALLOW, "*", Operation.ALL)));
+        requests.put(Resource.CLUSTER_RESOURCE, actions);
+        return new AlterAclsRequest(requests);
+    }
+
+    private AbstractRequestResponse createAlterAclsResponse() {
+        Map<Resource, List<ActionResponse>> responses = new HashMap<>();
+        List<ActionResponse> actionResponses = new ArrayList<>();
+        actionResponses.add(new ActionResponse(Action.ADD, new Acl(KafkaPrincipal.ANONYMOUS, PermissionType.ALLOW, "*", Operation.ALL), Errors.NONE));
+        responses.put(Resource.CLUSTER_RESOURCE, actionResponses);
+        return new AlterAclsResponse(responses);
+    }
 }
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala
index c23dd2d81f5..3f9816fe81a 100644
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -18,7 +18,7 @@
 package kafka.security.auth
 
 import kafka.utils.Json
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, Acl => JAcl, Operation => JOperation, PermissionType => JPermissionType}
 
 object Acl {
   val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
@@ -75,6 +75,14 @@ object Acl {
     acls.toSet
   }
 
+  def fromJava(jAcl: JAcl): Acl = {
+    new Acl(
+      jAcl.getPrincipal,
+      PermissionType.fromString(jAcl.getPermissionType.name),
+      jAcl.getHost,
+      Operation.fromString(jAcl.getOperation.name))
+  }
+
   def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {
     Map(Acl.VersionKey -> Acl.CurrentVersion, Acl.AclsKey -> acls.map(acl => acl.toMap).toList)
   }
@@ -85,7 +93,8 @@ object Acl {
  * <pre>
  * Principal P has permissionType PT on Operation O1 from hosts H1.
  * </pre>
- * @param principal A value of *:* indicates all users.
+  *
+  * @param principal A value of *:* indicates all users.
  * @param permissionType
  * @param host A value of * indicates all hosts.
  * @param operation A value of ALL indicates all operations.
@@ -94,7 +103,8 @@ case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host:
 
   /**
    * TODO: Ideally we would have a symmetric toJson method but our current json library can not jsonify/dejsonify complex objects.
-   * @return Map representation of the Acl.
+    *
+    * @return Map representation of the Acl.
    */
   def toMap(): Map[String, Any] = {
     Map(Acl.PrincipalKey -> principal.toString,
@@ -107,5 +117,14 @@ case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host:
     "%s has %s permission for operations: %s from hosts: %s".format(principal, permissionType.name, operation, host)
   }
 
+  def asJava: JAcl = {
+    new JAcl(
+      principal,
+      JPermissionType.forName(permissionType.name),
+      host,
+      JOperation.forName(operation.name)
+    )
+  }
+
 }
 
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala
index 797c77bb6cc..ac7f3c459a9 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -16,6 +16,8 @@
  */
 package kafka.security.auth
 
+import org.apache.kafka.common.security.auth.{Resource => JResource, ResourceType => JResourceType}
+
 object Resource {
   val Separator = ":"
   val ClusterResourceName = "kafka-cluster"
@@ -28,6 +30,10 @@ object Resource {
       case s => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
     }
   }
+
+  def fromJava(jResource: JResource): Resource = {
+    new Resource(ResourceType.fromString(jResource.getResourceType.name), jResource.getName)
+  }
 }
 
 /**
@@ -41,5 +47,9 @@ case class Resource(val resourceType: ResourceType, val name: String) {
   override def toString: String = {
     resourceType.name + Resource.Separator + name
   }
+
+  def asJava: JResource = {
+    new JResource(JResourceType.forName(resourceType.name), name)
+  }
 }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4f77d302734..10910ff49af 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.nio.ByteBuffer
 import java.lang.{Long => JLong, Short => JShort}
-import java.util.Properties
+import java.util.{Set => JSet, Properties}
 
 import kafka.admin.{RackAwareMode, AdminUtils}
 import kafka.api._
@@ -31,18 +31,21 @@ import kafka.log._
 import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.network._
 import kafka.network.RequestChannel.{Session, Response}
-import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
+import kafka.security.auth.{Acl, Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write, All}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException,
 ClusterAuthorizationException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
+import org.apache.kafka.common.requests.AlterAclsResponse.ActionResponse
+import org.apache.kafka.common.requests.{AlterAclsResponse, AlterAclsRequest, ListAclsResponse, ListAclsRequest,
+ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
 DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
 LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
 StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse,
 MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.security.auth.{Acl => JAcl, Resource => JResource, PermissionType}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Node}
 import org.apache.kafka.common.internals.TopicConstants
@@ -93,6 +96,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
         case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
+        case ApiKeys.LIST_ACLS => handleListAclsRequest(request)
+        case ApiKeys.ALTER_ACLS => handleAlterAclsRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -853,8 +858,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleJoinGroupRequest(request: RequestChannel.Request) {
-    import JavaConversions._
-
     val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
     val responseHeader = new ResponseHeader(request.header.correlationId)
 
@@ -862,7 +865,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(joinResult: JoinGroupResult) {
       val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
       val responseBody = new JoinGroupResponse(joinResult.errorCode, joinResult.generationId, joinResult.subProtocol,
-        joinResult.memberId, joinResult.leaderId, members)
+        joinResult.memberId, joinResult.leaderId, members.asJava)
 
       trace("Sending join group response %s for correlation id %d to client %s."
         .format(responseBody, request.header.correlationId, request.header.clientId))
@@ -876,11 +879,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         JoinGroupResponse.UNKNOWN_PROTOCOL,
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
-        Map.empty[String, ByteBuffer])
+        Map.empty[String, ByteBuffer].asJava)
       requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
     } else {
       // let the coordinator to handle join-group
-      val protocols = joinGroupRequest.groupProtocols().map(protocol =>
+      val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
         (protocol.name, Utils.toArray(protocol.metadata))).toList
       coordinator.handleJoinGroup(
         joinGroupRequest.groupId,
@@ -895,8 +898,6 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleSyncGroupRequest(request: RequestChannel.Request) {
-    import JavaConversions._
-
     val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest]
 
     def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
@@ -912,7 +913,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         syncGroupRequest.groupId(),
         syncGroupRequest.generationId(),
         syncGroupRequest.memberId(),
-        syncGroupRequest.groupAssignment().mapValues(Utils.toArray(_)),
+        syncGroupRequest.groupAssignment().asScala.mapValues(Utils.toArray),
         sendResponseCallback
       )
     }
@@ -993,6 +994,112 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleListAclsRequest(request: RequestChannel.Request): Unit = {
+    val listAclsRequest = request.body.asInstanceOf[ListAclsRequest]
+
+    val respHeader = new ResponseHeader(request.header.correlationId)
+    val responseBody =
+      authorizer match {
+        case None =>
+          error("Error processing list acls request. No Authorizer is available.")
+          new ListAclsResponse(Map[JResource, JSet[JAcl]]().asJava, Errors.CLUSTER_AUTHORIZATION_FAILED)
+        case Some(auth) =>
+          // authorized if the principal has all access on the cluster
+          // or the principal is requesting their own acls
+          val isAuthorized = authorize(request.session, All, Resource.ClusterResource)
+          val isUserPrinciple = listAclsRequest.hasPrincipal && listAclsRequest.getPrincipal == request.session.principal
+
+          if (isAuthorized || isUserPrinciple) {
+            val aclsByResource =
+              if (listAclsRequest.hasPrincipal)
+                auth.getAcls(listAclsRequest.getPrincipal)
+              else
+                auth.getAcls()
+
+            // only show users their acls of permission type allow
+            val filteredAclsByResource =
+              if (!isAuthorized)
+                aclsByResource.mapValues(_.filter(_.permissionType == PermissionType.ALLOW))
+              else
+                aclsByResource
+
+            val acls =
+              if (listAclsRequest.hasResource)
+                filteredAclsByResource.filterKeys(_ == listAclsRequest.getResource)
+              else
+                filteredAclsByResource
+
+            val responses =
+              acls.map { case (resource, acls) =>
+                (resource.asJava, acls.map(_.asJava).asJava)
+              }.asJava
+            new ListAclsResponse(responses, Errors.NONE)
+          } else {
+            new ListAclsResponse(Map[JResource, JSet[JAcl]]().asJava, Errors.CLUSTER_AUTHORIZATION_FAILED)
+          }
+      }
+
+    trace(s"Sending list acls response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
+  }
+
+  def handleAlterAclsRequest(request: RequestChannel.Request): Unit = {
+    val alterAclsRequest = request.body.asInstanceOf[AlterAclsRequest]
+
+    def unauthorizedResponse: AlterAclsResponse = {
+      val responses = alterAclsRequest.requests.asScala.map { case (resource, actionRequests) =>
+        val actionResponses = actionRequests.asScala.map { actionRequest =>
+          new ActionResponse(actionRequest.action, actionRequest.acl, Errors.CLUSTER_AUTHORIZATION_FAILED)
+        }
+        (resource, actionResponses.asJava)
+      }
+      new AlterAclsResponse(responses.asJava)
+    }
+
+    val respHeader = new ResponseHeader(request.header.correlationId)
+    val responseBody =
+      authorizer match {
+        case None =>
+          error("Error processing list acls request. No Authorizer is available.")
+          unauthorizedResponse
+        case Some(auth) =>
+          // authorized if the principal has all access on the cluster
+          if (authorize(request.session, All, Resource.ClusterResource)) {
+            val results = alterAclsRequest.requests.asScala.map { case (resource, actionRequests) =>
+              val actionResponses = actionRequests.asScala
+                // Group acls by action
+                .groupBy(_.action).mapValues(_.map(_.acl)).toList
+                // process deletes first
+                .sortBy { case(action, acls) => action.id }
+                .flatMap { case(action, acls) =>
+                  try {
+                    val sAcls = acls.map(Acl.fromJava).toSet
+                    action match {
+                      case AlterAclsRequest.Action.DELETE =>
+                        debug(s"Deleting $acls for $resource")
+                        auth.removeAcls(sAcls, Resource.fromJava(resource))
+                      case AlterAclsRequest.Action.ADD =>
+                        debug(s"Adding $acls for $resource")
+                        auth.addAcls(sAcls, Resource.fromJava(resource))
+                    }
+                  } catch {
+                    case e: Throwable =>
+                      error("Error while responding to alter acl request", e)
+                      acls.map { acl => new ActionResponse(action, acl, Errors.forException(e)) }
+                  }
+                  acls.map { acl => new ActionResponse(action, acl, Errors.NONE) }
+                }
+              (resource, actionResponses.asJava)
+            }
+            new AlterAclsResponse(results.asJava)
+          } else
+            unauthorizedResponse
+      }
+
+    trace(s"Sending alter acls response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
+  }
+
   def close() {
     quotaManagers.foreach { case (apiKey, quotaManager) =>
       quotaManager.shutdown()
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index bc705f13f10..e7618e3f80a 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -20,7 +20,6 @@ import java.util.{ArrayList, Collections, Properties}
 
 import kafka.cluster.EndPoint
 import kafka.common.TopicAndPartition
-import kafka.coordinator.GroupCoordinator
 import kafka.integration.KafkaServerTestHarness
 import kafka.security.auth._
 import kafka.server.KafkaConfig
@@ -29,8 +28,9 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsume
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.AlterAclsRequest.ActionRequest
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, PermissionType, Operation, Acl => JAcl, Resource => JResource}
 import org.apache.kafka.common.{Node, TopicPartition, requests}
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
@@ -57,6 +57,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   val TopicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
   val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+  val AllAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, All)))
 
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
   val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
@@ -89,7 +90,9 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       ApiKeys.LEAVE_GROUP.id -> classOf[LeaveGroupResponse],
       ApiKeys.LEADER_AND_ISR.id -> classOf[requests.LeaderAndIsrResponse],
       ApiKeys.STOP_REPLICA.id -> classOf[requests.StopReplicaResponse],
-      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> classOf[requests.ControlledShutdownResponse]
+      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> classOf[requests.ControlledShutdownResponse],
+      ApiKeys.LIST_ACLS.id -> classOf[requests.ListAclsResponse],
+      ApiKeys.ALTER_ACLS.id -> classOf[requests.AlterAclsResponse]
     )
 
   val RequestKeyToErrorCode = Map[Short, (Nothing) => Short](
@@ -107,7 +110,9 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     ApiKeys.LEAVE_GROUP.id -> ((resp: LeaveGroupResponse) => resp.errorCode()),
     ApiKeys.LEADER_AND_ISR.id -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
     ApiKeys.STOP_REPLICA.id -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
-    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode())
+    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode()),
+    ApiKeys.LIST_ACLS.id -> ((resp: requests.ListAclsResponse) => resp.error.code),
+    ApiKeys.ALTER_ACLS.id -> ((resp: requests.AlterAclsResponse) => resp.results().asScala.head._2.asScala.head.error.code)
   )
 
   val RequestKeysToAcls = Map[Short, Map[Resource, Set[Acl]]](
@@ -125,7 +130,9 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     ApiKeys.LEAVE_GROUP.id -> GroupReadAcl,
     ApiKeys.LEADER_AND_ISR.id -> ClusterAcl,
     ApiKeys.STOP_REPLICA.id -> ClusterAcl,
-    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ClusterAcl
+    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ClusterAcl,
+    ApiKeys.LIST_ACLS.id -> AllAcl,
+    ApiKeys.ALTER_ACLS.id -> AllAcl
   )
 
   // configure the servers and clients
@@ -227,6 +234,18 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
     new requests.ControlledShutdownRequest(brokerId)
   }
 
+  private def createListAclsRequest = {
+    new ListAclsRequest()
+  }
+
+  private def createAlterAclsRequest = {
+    val requests = Map(
+      JResource.CLUSTER_RESOURCE -> List(
+        new ActionRequest(AlterAclsRequest.Action.ADD, new JAcl(KafkaPrincipal.ANONYMOUS, PermissionType.ALLOW, "*", Operation.DESCRIBE))
+      ).asJava)
+    new AlterAclsRequest(requests.asJava)
+  }
+
   @Test
   def testAuthorization() {
     val requestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest](
@@ -244,18 +263,34 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
       ApiKeys.LEAVE_GROUP.id -> createLeaveGroupRequest,
       ApiKeys.LEADER_AND_ISR.id -> createLeaderAndIsrRequest,
       ApiKeys.STOP_REPLICA.id -> createStopReplicaRequest,
-      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> createControlledShutdownRequest
+      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> createControlledShutdownRequest,
+      ApiKeys.LIST_ACLS.id -> createListAclsRequest,
+      ApiKeys.ALTER_ACLS.id -> createAlterAclsRequest
     )
 
     val socket = new Socket("localhost", servers.head.boundPort())
+    try {
+      for ((key, request) <- requestKeyToRequest) {
+        removeAllAcls
+        val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
+        sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = false)
+        for ((resource, acls) <- RequestKeysToAcls(key))
+          addAndVerifyAcls(acls, resource)
+        sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = true)
+      }
+    } finally {
+      socket.close()
+    }
+  }
 
-    for ((key, request) <- requestKeyToRequest) {
-      removeAllAcls
-      val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
-      sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = false)
-      for ((resource, acls) <- RequestKeysToAcls(key))
-        addAndVerifyAcls(acls, resource)
-      sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = true)
+  @Test
+  def testListAclsWithOwnPrincipal() {
+    val socket = new Socket("localhost", servers.head.boundPort())
+    try {
+      val request = new ListAclsRequest(KafkaPrincipal.ANONYMOUS)
+      sendRequestAndVerifyResponseErrorCode(socket, ApiKeys.LIST_ACLS.id, request, Set(Cluster), isAuthorized = true)
+    } finally {
+      socket.close()
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/AclsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AclsRequestTest.scala
new file mode 100644
index 00000000000..861c23c4605
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AclsRequestTest.scala
@@ -0,0 +1,176 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.nio.ByteBuffer
+
+import kafka.security.auth.{SimpleAclAuthorizer, Deny, Describe, Read, Resource, All, Allow, Acl, Topic}
+import kafka.utils._
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{AlterAclsResponse, AlterAclsRequest, ListAclsResponse, ListAclsRequest, RequestHeader,
+ResponseHeader}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth.{Resource => JResource, ResourceType => JResourceType}
+import org.apache.log4j.{Level, Logger}
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
+
+class AclsRequestTest extends BaseRequestTest {
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    Logger.getLogger(classOf[SimpleAclAuthorizer]).setLevel(Level.DEBUG)
+    Logger.getLogger(classOf[KafkaApis]).setLevel(Level.DEBUG)
+  }
+
+  @Test
+  def testListAclsRequests() {
+    val acls1 = Set(Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, All))
+    val resource1 = Resource.ClusterResource
+    val jResource1 = resource1.asJava
+    servers.head.apis.authorizer.get.addAcls(acls1, resource1)
+
+    val acls2 = Set(Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))
+    val resource2 = new Resource(Topic, "topic")
+    val jResource2 = resource2.asJava
+    servers.head.apis.authorizer.get.addAcls(acls2, resource2)
+
+    val unusedJResource = new JResource(JResourceType.TOPIC, "unused")
+    val unusedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "unused")
+
+    TestUtils.waitAndVerifyAcls(acls1, servers.head.apis.authorizer.get, resource1)
+
+    validateListAclsRequests(new ListAclsRequest(), Map(resource1 -> acls1, resource2 -> acls2))
+    validateListAclsRequests(new ListAclsRequest(KafkaPrincipal.ANONYMOUS), Map(resource1 -> acls1, resource2 -> acls2))
+    validateListAclsRequests(new ListAclsRequest(jResource2), Map(resource2 -> acls2))
+    validateListAclsRequests(new ListAclsRequest(KafkaPrincipal.ANONYMOUS, jResource1), Map(resource1 -> acls1))
+    validateListAclsRequests(new ListAclsRequest(unusedJResource), Map())
+    validateListAclsRequests(new ListAclsRequest(unusedPrincipal), Map())
+    validateListAclsRequests(new ListAclsRequest(unusedPrincipal, unusedJResource), Map())
+  }
+
+  private def validateListAclsRequests(request: ListAclsRequest, expectedAcls: Map[Resource, Set[Acl]]): Unit = {
+    val response = sendListAclsRequest(request)
+
+    assertEquals(s"There should be no errors", Errors.NONE, response.error)
+
+    response.acls.asScala.foreach { case (resource, acls) =>
+      val sResource = Resource.fromJava(resource)
+
+
+      assertTrue("The resource should have acls", expectedAcls.contains(sResource))
+      assertEquals("The resource should have the correct number of acls", expectedAcls.get(sResource).size, acls.size)
+
+      val expectedAclsForResource = expectedAcls.get(sResource).get
+      acls.asScala.foreach { jAcl =>
+        val sAcl = Acl.fromJava(jAcl)
+        assertTrue(s"The resource should have acl $sAcl", expectedAclsForResource.contains(sAcl))
+      }
+    }
+  }
+
+  @Test
+  def testAlterAclsRequests() {
+    val resource1 = Resource.ClusterResource
+    val jResource1 = resource1.asJava
+    val acl1 = new Acl(KafkaPrincipal.ANONYMOUS, Allow, "*", Describe)
+    val jAcl1 = acl1.asJava
+    val actionRequest1 = new AlterAclsRequest.ActionRequest(AlterAclsRequest.Action.ADD, jAcl1)
+    val requests1 = Map(jResource1 -> List(actionRequest1).asJava)
+
+    validateAlterAclsRequests(new AlterAclsRequest(requests1.asJava), Map(resource1 -> Set(acl1)))
+    removeAllAcls
+
+    val resource2 = Resource.ClusterResource
+    val jResource2 = resource2.asJava
+    val acl2a = new Acl(KafkaPrincipal.ANONYMOUS, Allow, "*", Describe)
+    val jAcl2a = acl2a.asJava
+    val actionRequest2a = new AlterAclsRequest.ActionRequest(AlterAclsRequest.Action.ADD, jAcl2a)
+    val acl2b = new Acl(KafkaPrincipal.ANONYMOUS, Deny, "*", Describe)
+    val jAcl2b = acl2b.asJava
+    // Add a second acl
+    val actionRequest2b = new AlterAclsRequest.ActionRequest(AlterAclsRequest.Action.ADD, jAcl2b)
+    // Make sure deletes are processed first
+    val actionRequest2c = new AlterAclsRequest.ActionRequest(AlterAclsRequest.Action.DELETE, jAcl2a)
+    val requests2 = Map(jResource2 -> List(actionRequest2a, actionRequest2b).asJava)
+
+    validateAlterAclsRequests(new AlterAclsRequest(requests2.asJava), Map(resource2 -> Set(acl2a, acl2b)))
+    removeAllAcls
+  }
+
+  private def validateAlterAclsRequests(request: AlterAclsRequest, expectedAcls: Map[Resource, Set[Acl]]): Unit = {
+    val response = sendAlterAclsRequest(request)
+
+    val hasError =
+      !response.results.asScala.forall { case (resource, results) =>
+        results.asScala.forall(_.error == Errors.NONE)
+      }
+
+    assertFalse(s"There should be no errors", hasError)
+
+    expectedAcls.foreach { case (resource, acls) =>
+      TestUtils.waitAndVerifyAcls(acls, servers.head.apis.authorizer.get, resource)
+    }
+  }
+
+  private def sendListAclsRequest(request: ListAclsRequest): ListAclsResponse = {
+    val correlationId = -1
+
+    val serializedBytes = {
+      val header = new RequestHeader(ApiKeys.LIST_ACLS.id, 0, "", correlationId)
+      val byteBuffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf)
+      header.writeTo(byteBuffer)
+      request.writeTo(byteBuffer)
+      byteBuffer.array()
+    }
+
+    val response = requestAndReceive(serializedBytes)
+
+    val responseBuffer = ByteBuffer.wrap(response)
+    val responseHeader = ResponseHeader.parse(responseBuffer)
+    ListAclsResponse.parse(responseBuffer)
+  }
+
+  private def sendAlterAclsRequest(request: AlterAclsRequest): AlterAclsResponse = {
+    val correlationId = -1
+
+    val serializedBytes = {
+      val header = new RequestHeader(ApiKeys.ALTER_ACLS.id, 0, "", correlationId)
+      val byteBuffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf)
+      header.writeTo(byteBuffer)
+      request.writeTo(byteBuffer)
+      byteBuffer.array()
+    }
+
+    val response = requestAndReceive(serializedBytes)
+
+    val responseBuffer = ByteBuffer.wrap(response)
+    val responseHeader = ResponseHeader.parse(responseBuffer)
+    AlterAclsResponse.parse(responseBuffer)
+  }
+
+  def removeAllAcls() = {
+    servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
+      servers.head.apis.authorizer.get.removeAcls(resource)
+      TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.apis.authorizer.get, resource)
+    }
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
new file mode 100644
index 00000000000..cd6f5231467
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+
+import kafka.client.ClientUtils
+import kafka.cluster.Broker
+import kafka.integration.KafkaServerTestHarness
+import kafka.network.SocketServer
+import kafka.security.auth.SimpleAclAuthorizer
+import kafka.utils._
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.junit.Before
+
+class BaseRequestTest extends KafkaServerTestHarness {
+    val numBrokers = 3
+    def generateConfigs() = {
+        val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false)
+        props.foreach { p =>
+            p.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
+            p.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
+            p.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
+            p.setProperty(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
+        }
+        props.map(KafkaConfig.fromProps)
+    }
+
+    @Before
+    override def setUp() {
+        super.setUp()
+        TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
+    }
+
+    def socketServer = {
+        servers.head.socketServer
+    }
+
+    def broker = {
+        val broker = servers.head
+        new Broker(broker.config.brokerId, broker.config.hostName, broker.boundPort())
+    }
+
+    def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = {
+        new Socket("localhost", s.boundPort(protocol))
+    }
+
+    def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None) {
+        val outgoing = new DataOutputStream(socket.getOutputStream)
+        id match {
+            case Some(id) =>
+                outgoing.writeInt(request.length + 2)
+                outgoing.writeShort(id)
+            case None =>
+                outgoing.writeInt(request.length)
+        }
+        outgoing.write(request)
+        outgoing.flush()
+    }
+
+    def receiveResponse(socket: Socket): Array[Byte] = {
+        val incoming = new DataInputStream(socket.getInputStream)
+        val len = incoming.readInt()
+        val response = new Array[Byte](len)
+            incoming.readFully(response)
+        response
+    }
+
+    def requestAndReceive(request: Array[Byte], id: Option[Short] = None): Array[Byte] = {
+        val plainSocket = connect()
+        try {
+            sendRequest(plainSocket, request, id)
+            receiveResponse(plainSocket)
+        } finally {
+            plainSocket.close()
+        }
+    }
+
+    def topicExists(topic: String, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Boolean = {
+        val metadata = ClientUtils.fetchTopicMetadata(Set(topic), Seq(broker.getBrokerEndPoint(protocol)),
+            "topicExists", 2000, 0).topicsMetadata
+
+        metadata.exists(p => p.topic.equals(topic) && p.errorCode == Errors.NONE.code)
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Implement KIP-140 RPCs and APIs for creating, altering, and listing ACLs
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-3266
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3266
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Grant Henke
>            Assignee: Colin P. McCabe
>             Fix For: 0.11.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)