You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/31 16:37:44 UTC

[3/3] kafka git commit: KAFKA-5265; Move ACLs, Config, Topic classes into org.apache.kafka.common

KAFKA-5265; Move ACLs, Config, Topic classes into org.apache.kafka.common

Also introduce TopicConfig.

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3120 from cmccabe/KAFKA-5265


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da9a171c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da9a171c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da9a171c

Branch: refs/heads/trunk
Commit: da9a171c99eb456378bdda95a563d09dfd9af4d8
Parents: 9323a75
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Wed May 31 16:46:43 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 31 17:35:31 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  11 ++
 .../kafka/clients/admin/AccessControlEntry.java |  86 ----------
 .../clients/admin/AccessControlEntryData.java   | 105 ------------
 .../clients/admin/AccessControlEntryFilter.java | 117 -------------
 .../apache/kafka/clients/admin/AclBinding.java  |  74 ---------
 .../kafka/clients/admin/AclBindingFilter.java   |  89 ----------
 .../kafka/clients/admin/AclOperation.java       | 137 ----------------
 .../kafka/clients/admin/AclPermissionType.java  |  92 -----------
 .../apache/kafka/clients/admin/AdminClient.java |  80 ++++-----
 .../kafka/clients/admin/AlterConfigsResult.java |  43 +++++
 .../clients/admin/AlterConfigsResults.java      |  42 -----
 .../kafka/clients/admin/ApiVersionsResult.java  |  63 +++++++
 .../kafka/clients/admin/ApiVersionsResults.java |  63 -------
 .../kafka/clients/admin/ConfigResource.java     |  65 --------
 .../kafka/clients/admin/CreateAclsResult.java   |  49 ++++++
 .../kafka/clients/admin/CreateAclsResults.java  |  48 ------
 .../kafka/clients/admin/CreateTopicResults.java |  49 ------
 .../kafka/clients/admin/CreateTopicsResult.java |  49 ++++++
 .../kafka/clients/admin/DeleteAclsResult.java   | 109 +++++++++++++
 .../kafka/clients/admin/DeleteAclsResults.java  | 107 ------------
 .../kafka/clients/admin/DeleteTopicResults.java |  50 ------
 .../kafka/clients/admin/DeleteTopicsResult.java |  50 ++++++
 .../kafka/clients/admin/DescribeAclsResult.java |  38 +++++
 .../clients/admin/DescribeAclsResults.java      |  37 -----
 .../clients/admin/DescribeClusterResult.java    |  65 ++++++++
 .../clients/admin/DescribeClusterResults.java   |  65 --------
 .../clients/admin/DescribeConfigsResult.java    |  60 +++++++
 .../clients/admin/DescribeConfigsResults.java   |  59 -------
 .../clients/admin/DescribeTopicsResult.java     |  68 ++++++++
 .../clients/admin/DescribeTopicsResults.java    |  68 --------
 .../kafka/clients/admin/KafkaAdminClient.java   |  52 +++---
 .../kafka/clients/admin/ListTopicsResult.java   |  67 ++++++++
 .../kafka/clients/admin/ListTopicsResults.java  |  67 --------
 .../apache/kafka/clients/admin/Resource.java    |  74 ---------
 .../kafka/clients/admin/ResourceFilter.java     |  90 ----------
 .../kafka/clients/admin/ResourceType.java       | 102 ------------
 .../kafka/clients/admin/TopicDescription.java   |   1 +
 .../kafka/clients/admin/TopicPartitionInfo.java |  58 -------
 .../apache/kafka/common/TopicPartitionInfo.java |  57 +++++++
 .../kafka/common/acl/AccessControlEntry.java    |  86 ++++++++++
 .../common/acl/AccessControlEntryData.java      | 105 ++++++++++++
 .../common/acl/AccessControlEntryFilter.java    | 117 +++++++++++++
 .../org/apache/kafka/common/acl/AclBinding.java |  77 +++++++++
 .../kafka/common/acl/AclBindingFilter.java      |  93 +++++++++++
 .../apache/kafka/common/acl/AclOperation.java   | 137 ++++++++++++++++
 .../kafka/common/acl/AclPermissionType.java     |  92 +++++++++++
 .../kafka/common/config/ConfigResource.java     |  65 ++++++++
 .../apache/kafka/common/config/TopicConfig.java | 163 +++++++++++++++++++
 .../common/requests/CreateAclsRequest.java      |   6 +-
 .../common/requests/DeleteAclsRequest.java      |   6 +-
 .../common/requests/DeleteAclsResponse.java     |   6 +-
 .../common/requests/DescribeAclsRequest.java    |   8 +-
 .../common/requests/DescribeAclsResponse.java   |   6 +-
 .../kafka/common/requests/RequestUtils.java     |  14 +-
 .../apache/kafka/common/resource/Resource.java  |  75 +++++++++
 .../kafka/common/resource/ResourceFilter.java   |  91 +++++++++++
 .../kafka/common/resource/ResourceType.java     | 102 ++++++++++++
 .../kafka/clients/admin/AclBindingTest.java     | 110 -------------
 .../kafka/clients/admin/AclOperationTest.java   |  89 ----------
 .../clients/admin/AclPermissionTypeTest.java    |  80 ---------
 .../clients/admin/KafkaAdminClientTest.java     |  15 +-
 .../kafka/clients/admin/ResourceTypeTest.java   |  82 ----------
 .../apache/kafka/common/acl/AclBindingTest.java | 113 +++++++++++++
 .../kafka/common/acl/AclOperationTest.java      |  89 ++++++++++
 .../kafka/common/acl/AclPermissionTypeTest.java |  81 +++++++++
 .../common/requests/RequestResponseTest.java    |  18 +-
 .../kafka/common/resource/ResourceTypeTest.java |  82 ++++++++++
 core/src/main/scala/kafka/log/LogConfig.scala   | 155 +++++++-----------
 .../scala/kafka/security/auth/Operation.scala   |   2 +-
 .../kafka/security/auth/PermissionType.scala    |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   3 +-
 .../api/KafkaAdminClientIntegrationTest.scala   |   4 +
 .../api/SaslSslAdminClientIntegrationTest.scala |   4 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |   3 +-
 74 files changed, 2481 insertions(+), 2306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index fdedef8..ab4f15d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -50,6 +50,11 @@
     <allow pkg="org.apache.kafka.common.internals" exact-match="true" />
     <allow pkg="org.apache.kafka.test" />
 
+    <subpackage name="acl">
+      <allow pkg="org.apache.kafka.common.acl" />
+      <allow pkg="org.apache.kafka.common.resource" />
+    </subpackage>
+
     <subpackage name="config">
       <allow pkg="org.apache.kafka.common.config" />
       <!-- for testing -->
@@ -68,6 +73,10 @@
       <allow pkg="org.apache.kafka.common.security" />
     </subpackage>
 
+    <subpackage name="resource">
+      <allow pkg="org.apache.kafka.common.resource" />
+    </subpackage>
+
     <subpackage name="security">
       <allow pkg="org.apache.kafka.common.annotation" />
       <allow pkg="org.apache.kafka.common.network" />
@@ -109,9 +118,11 @@
 
     <subpackage name="requests">
       <allow pkg="org.apache.kafka.clients.admin" />
+      <allow pkg="org.apache.kafka.common.acl" />
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.network" />
       <allow pkg="org.apache.kafka.common.requests" />
+      <allow pkg="org.apache.kafka.common.resource" />
       <allow pkg="org.apache.kafka.common.record" />
       <!-- for testing -->
       <allow pkg="org.apache.kafka.common.errors" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java
deleted file mode 100644
index 0c36a21..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntry.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import java.util.Objects;
-
-/**
- * Represents an access control entry.  ACEs are a tuple of principal, host,
- * operation, and permissionType.
- */
-public class AccessControlEntry {
-    final AccessControlEntryData data;
-
-    public AccessControlEntry(String principal, String host, AclOperation operation, AclPermissionType permissionType) {
-        Objects.requireNonNull(principal);
-        Objects.requireNonNull(host);
-        Objects.requireNonNull(operation);
-        assert operation != AclOperation.ANY;
-        Objects.requireNonNull(permissionType);
-        assert permissionType != AclPermissionType.ANY;
-        this.data = new AccessControlEntryData(principal, host, operation, permissionType);
-    }
-
-    public String principal() {
-        return data.principal();
-    }
-
-    public String host() {
-        return data.host();
-    }
-
-    public AclOperation operation() {
-        return data.operation();
-    }
-
-    public AclPermissionType permissionType() {
-        return data.permissionType();
-    }
-
-    /**
-     * Create a filter which matches only this AccessControlEntry.
-     */
-    public AccessControlEntryFilter toFilter() {
-        return new AccessControlEntryFilter(data);
-    }
-
-    @Override
-    public String toString() {
-        return data.toString();
-    }
-
-    /**
-     * Return true if this AclResource has any UNKNOWN components.
-     */
-    public boolean unknown() {
-        return data.unknown();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof AccessControlEntry))
-            return false;
-        AccessControlEntry other = (AccessControlEntry) o;
-        return data.equals(other.data);
-    }
-
-    @Override
-    public int hashCode() {
-        return data.hashCode();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java
deleted file mode 100644
index 81f57ad..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryData.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import java.util.Objects;
-
-/**
- * An internal, private class which contains the data stored in AccessControlEntry and
- * AccessControlEntryFilter objects.
- */
-class AccessControlEntryData {
-    private final String principal;
-    private final String host;
-    private final AclOperation operation;
-    private final AclPermissionType permissionType;
-
-    AccessControlEntryData(String principal, String host, AclOperation operation, AclPermissionType permissionType) {
-        this.principal = principal;
-        this.host = host;
-        this.operation = operation;
-        this.permissionType = permissionType;
-    }
-
-    String principal() {
-        return principal;
-    }
-
-    String host() {
-        return host;
-    }
-
-    AclOperation operation() {
-        return operation;
-    }
-
-    AclPermissionType permissionType() {
-        return permissionType;
-    }
-
-    /**
-     * Returns a string describing an ANY or UNKNOWN field, or null if there is
-     * no such field.
-     */
-    public String findIndefiniteField() {
-        if (principal() == null)
-            return "Principal is NULL";
-        if (host() == null)
-            return "Host is NULL";
-        if (operation() == AclOperation.ANY)
-            return "Operation is ANY";
-        if (operation() == AclOperation.UNKNOWN)
-            return "Operation is UNKNOWN";
-        if (permissionType() == AclPermissionType.ANY)
-            return "Permission type is ANY";
-        if (permissionType() == AclPermissionType.UNKNOWN)
-            return "Permission type is UNKNOWN";
-        return null;
-    }
-
-    @Override
-    public String toString() {
-        return "(principal=" + (principal == null ? "<any>" : principal) +
-               ", host=" + (host == null ? "<any>" : host) +
-               ", operation=" + operation +
-               ", permissionType=" + permissionType + ")";
-    }
-
-    /**
-     * Return true if there are any UNKNOWN components.
-     */
-    boolean unknown() {
-        return operation.unknown() || permissionType.unknown();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof AccessControlEntryData))
-            return false;
-        AccessControlEntryData other = (AccessControlEntryData) o;
-        return Objects.equals(principal, other.principal) &&
-            Objects.equals(host, other.host) &&
-            Objects.equals(operation, other.operation) &&
-            Objects.equals(permissionType, other.permissionType);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(principal, host, operation, permissionType);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java b/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java
deleted file mode 100644
index 0ec1027..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AccessControlEntryFilter.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import java.util.Objects;
-
-/**
- * Represents a filter which matches access control entries.
- */
-public class AccessControlEntryFilter {
-    private final AccessControlEntryData data;
-
-    public static final AccessControlEntryFilter ANY =
-        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY);
-
-    public AccessControlEntryFilter(String principal, String host, AclOperation operation, AclPermissionType permissionType) {
-        Objects.requireNonNull(operation);
-        Objects.requireNonNull(permissionType);
-        this.data = new AccessControlEntryData(principal, host, operation, permissionType);
-    }
-
-    /**
-     * This is a non-public constructor used in AccessControlEntry#toFilter
-     *
-     * @param data     The access control data.
-     */
-    AccessControlEntryFilter(AccessControlEntryData data) {
-        this.data = data;
-    }
-
-    public String principal() {
-        return data.principal();
-    }
-
-    public String host() {
-        return data.host();
-    }
-
-    public AclOperation operation() {
-        return data.operation();
-    }
-
-    public AclPermissionType permissionType() {
-        return data.permissionType();
-    }
-
-    @Override
-    public String toString() {
-        return data.toString();
-    }
-
-    /**
-     * Return true if there are any UNKNOWN components.
-     */
-    public boolean unknown() {
-        return data.unknown();
-    }
-
-    /**
-     * Returns true if this filter matches the given AccessControlEntry.
-     */
-    public boolean matches(AccessControlEntry other) {
-        if ((principal() != null) && (!data.principal().equals(other.principal())))
-            return false;
-        if ((host() != null) && (!host().equals(other.host())))
-            return false;
-        if ((operation() != AclOperation.ANY) && (!operation().equals(other.operation())))
-            return false;
-        if ((permissionType() != AclPermissionType.ANY) && (!permissionType().equals(other.permissionType())))
-            return false;
-        return true;
-    }
-
-    /**
-     * Returns true if this filter could only match one ACE-- in other words, if
-     * there are no ANY or UNKNOWN fields.
-     */
-    public boolean matchesAtMostOne() {
-        return findIndefiniteField() == null;
-    }
-
-    /**
-     * Returns a string describing an ANY or UNKNOWN field, or null if there is
-     * no such field.
-     */
-    public String findIndefiniteField() {
-        return data.findIndefiniteField();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof AccessControlEntryFilter))
-            return false;
-        AccessControlEntryFilter other = (AccessControlEntryFilter) o;
-        return data.equals(other.data);
-    }
-
-    @Override
-    public int hashCode() {
-        return data.hashCode();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java
deleted file mode 100644
index 45761b4..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AclBinding.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import java.util.Objects;
-
-/**
- * Represents a binding between a resource and an access control entry.
- */
-public class AclBinding {
-    private final Resource resource;
-    private final AccessControlEntry entry;
-
-    public AclBinding(Resource resource, AccessControlEntry entry) {
-        Objects.requireNonNull(resource);
-        this.resource = resource;
-        Objects.requireNonNull(entry);
-        this.entry = entry;
-    }
-
-    /**
-     * Return true if this binding has any UNKNOWN components.
-     */
-    public boolean unknown() {
-        return resource.unknown() || entry.unknown();
-    }
-
-    public Resource resource() {
-        return resource;
-    }
-
-    public final AccessControlEntry entry() {
-        return entry;
-    }
-
-    /**
-     * Create a filter which matches only this AclBinding.
-     */
-    public AclBindingFilter toFilter() {
-        return new AclBindingFilter(resource.toFilter(), entry.toFilter());
-    }
-
-    @Override
-    public String toString() {
-        return "(resource=" + resource + ", entry=" + entry + ")";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof AclBinding))
-            return false;
-        AclBinding other = (AclBinding) o;
-        return resource.equals(other.resource) && entry.equals(other.entry);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(resource, entry);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java
deleted file mode 100644
index 5e4142d..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AclBindingFilter.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import java.util.Objects;
-
-/**
- * A filter which can match AclBinding objects.
- */
-public class AclBindingFilter {
-    private final ResourceFilter resourceFilter;
-    private final AccessControlEntryFilter entryFilter;
-
-    /**
-     * A filter which matches any ACL binding.
-     */
-    public static final AclBindingFilter ANY = new AclBindingFilter(
-        new ResourceFilter(ResourceType.ANY, null),
-        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
-
-    public AclBindingFilter(ResourceFilter resourceFilter, AccessControlEntryFilter entryFilter) {
-        Objects.requireNonNull(resourceFilter);
-        this.resourceFilter = resourceFilter;
-        Objects.requireNonNull(entryFilter);
-        this.entryFilter = entryFilter;
-    }
-
-    /**
-     * Return true if this filter has any UNKNOWN components.
-     */
-    public boolean unknown() {
-        return resourceFilter.unknown() || entryFilter.unknown();
-    }
-
-    public ResourceFilter resourceFilter() {
-        return resourceFilter;
-    }
-
-    public final AccessControlEntryFilter entryFilter() {
-        return entryFilter;
-    }
-
-    @Override
-    public String toString() {
-        return "(resourceFilter=" + resourceFilter + ", entryFilter=" + entryFilter + ")";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof AclBindingFilter))
-            return false;
-        AclBindingFilter other = (AclBindingFilter) o;
-        return resourceFilter.equals(other.resourceFilter) && entryFilter.equals(other.entryFilter);
-    }
-
-    public boolean matchesAtMostOne() {
-        return resourceFilter.matchesAtMostOne() && entryFilter.matchesAtMostOne();
-    }
-
-    public String findIndefiniteField() {
-        String indefinite = resourceFilter.findIndefiniteField();
-        if (indefinite != null)
-            return indefinite;
-        return entryFilter.findIndefiniteField();
-    }
-
-    public boolean matches(AclBinding binding) {
-        return resourceFilter.matches(binding.resource()) && entryFilter.matches(binding.entry());
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(resourceFilter, entryFilter);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
deleted file mode 100644
index 0c3ff50..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AclOperation.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import java.util.HashMap;
-import java.util.Locale;
-
-/**
- * Represents an operation which an ACL grants or denies permission to perform.
- */
-public enum AclOperation {
-    /**
-     * Represents any AclOperation which this client cannot understand, perhaps because this
-     * client is too old.
-     */
-    UNKNOWN((byte) 0),
-
-    /**
-     * In a filter, matches any AclOperation.
-     */
-    ANY((byte) 1),
-
-    /**
-     * ALL operation.
-     */
-    ALL((byte) 2),
-
-    /**
-     * READ operation.
-     */
-    READ((byte) 3),
-
-    /**
-     * WRITE operation.
-     */
-    WRITE((byte) 4),
-
-    /**
-     * CREATE operation.
-     */
-    CREATE((byte) 5),
-
-    /**
-     * DELETE operation.
-     */
-    DELETE((byte) 6),
-
-    /**
-     * ALTER operation.
-     */
-    ALTER((byte) 7),
-
-    /**
-     * DESCRIBE operation.
-     */
-    DESCRIBE((byte) 8),
-
-    /**
-     * CLUSTER_ACTION operation.
-     */
-    CLUSTER_ACTION((byte) 9),
-
-    /**
-     * DESCRIBE_CONFIGS operation.
-     */
-    DESCRIBE_CONFIGS((byte) 10),
-
-    /**
-     * ALTER_CONFIGS operation.
-     */
-    ALTER_CONFIGS((byte) 11),
-
-    /**
-     * IDEMPOTENT_WRITE operation.
-     */
-    IDEMPOTENT_WRITE((byte) 12);
-
-    private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>();
-
-    static {
-        for (AclOperation operation : AclOperation.values()) {
-            CODE_TO_VALUE.put(operation.code, operation);
-        }
-    }
-
-    /**
-     * Parse the given string as an ACL operation.
-     *
-     * @param str    The string to parse.
-     *
-     * @return       The AclOperation, or UNKNOWN if the string could not be matched.
-     */
-    public static AclOperation fromString(String str) throws IllegalArgumentException {
-        try {
-            return AclOperation.valueOf(str.toUpperCase(Locale.ROOT));
-        } catch (IllegalArgumentException e) {
-            return UNKNOWN;
-        }
-    }
-
-    public static AclOperation fromCode(byte code) {
-        AclOperation operation = CODE_TO_VALUE.get(code);
-        if (operation == null) {
-            return UNKNOWN;
-        }
-        return operation;
-    }
-
-    private final byte code;
-
-    AclOperation(byte code) {
-        this.code = code;
-    }
-
-    public byte code() {
-        return code;
-    }
-
-    public boolean unknown() {
-        return this == UNKNOWN;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java b/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java
deleted file mode 100644
index 9181c6b..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AclPermissionType.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import java.util.HashMap;
-import java.util.Locale;
-
-/**
- * Represents whether an ACL grants or denies permissions.
- */
-public enum AclPermissionType {
-    /**
-     * Represents any AclPermissionType which this client cannot understand,
-     * perhaps because this client is too old.
-     */
-    UNKNOWN((byte) 0),
-
-    /**
-     * In a filter, matches any AclPermissionType.
-     */
-    ANY((byte) 1),
-
-    /**
-     * Disallows access.
-     */
-    DENY((byte) 2),
-
-    /**
-     * Grants access.
-     */
-    ALLOW((byte) 3);
-
-    private final static HashMap<Byte, AclPermissionType> CODE_TO_VALUE = new HashMap<>();
-
-    static {
-        for (AclPermissionType permissionType : AclPermissionType.values()) {
-            CODE_TO_VALUE.put(permissionType.code, permissionType);
-        }
-    }
-
-    /**
-    * Parse the given string as an ACL permission.
-    *
-    * @param str    The string to parse.
-    *
-    * @return       The AclPermissionType, or UNKNOWN if the string could not be matched.
-    */
-    public static AclPermissionType fromString(String str) {
-        try {
-            return AclPermissionType.valueOf(str.toUpperCase(Locale.ROOT));
-        } catch (IllegalArgumentException e) {
-            return UNKNOWN;
-        }
-    }
-
-    public static AclPermissionType fromCode(byte code) {
-        AclPermissionType permissionType = CODE_TO_VALUE.get(code);
-        if (permissionType == null) {
-            return UNKNOWN;
-        }
-        return permissionType;
-    }
-
-    private final byte code;
-
-    AclPermissionType(byte code) {
-        this.code = code;
-    }
-
-    public byte code() {
-        return code;
-    }
-
-    public boolean unknown() {
-        return this == UNKNOWN;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 96b8ebb..8ae3249 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -18,7 +18,10 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ConfigResource;
 
 import java.util.Collection;
 import java.util.Map;
@@ -81,9 +84,9 @@ public abstract class AdminClient implements AutoCloseable {
      * Create a batch of new topics with the default options.
      *
      * @param newTopics         The new topics to create.
-     * @return                  The CreateTopicsResults.
+     * @return                  The CreateTopicsResult.
      */
-    public CreateTopicResults createTopics(Collection<NewTopic> newTopics) {
+    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
         return createTopics(newTopics, new CreateTopicsOptions());
     }
 
@@ -97,9 +100,9 @@ public abstract class AdminClient implements AutoCloseable {
      *
      * @param newTopics         The new topics to create.
      * @param options           The options to use when creating the new topics.
-     * @return                  The CreateTopicsResults.
+     * @return                  The CreateTopicsResult.
      */
-    public abstract CreateTopicResults createTopics(Collection<NewTopic> newTopics,
+    public abstract CreateTopicsResult createTopics(Collection<NewTopic> newTopics,
                                                     CreateTopicsOptions options);
 
     /**
@@ -107,9 +110,9 @@ public abstract class AdminClient implements AutoCloseable {
      * but uses the default options.
      *
      * @param topics            The topic names to delete.
-     * @return                  The DeleteTopicsResults.
+     * @return                  The DeleteTopicsResult.
      */
-    public DeleteTopicResults deleteTopics(Collection<String> topics) {
+    public DeleteTopicsResult deleteTopics(Collection<String> topics) {
         return deleteTopics(topics, new DeleteTopicsOptions());
     }
 
@@ -127,16 +130,16 @@ public abstract class AdminClient implements AutoCloseable {
      *
      * @param topics            The topic names to delete.
      * @param options           The options to use when deleting the topics.
-     * @return                  The DeleteTopicsResults.
+     * @return                  The DeleteTopicsResult.
      */
-    public abstract DeleteTopicResults deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
+    public abstract DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
 
     /**
      * List the topics available in the cluster with the default options.
      *
-     * @return                  The ListTopicsResults.
+     * @return                  The ListTopicsResult.
      */
-    public ListTopicsResults listTopics() {
+    public ListTopicsResult listTopics() {
         return listTopics(new ListTopicsOptions());
     }
 
@@ -144,9 +147,9 @@ public abstract class AdminClient implements AutoCloseable {
      * List the topics available in the cluster.
      *
      * @param options           The options to use when listing the topics.
-     * @return                  The ListTopicsResults.
+     * @return                  The ListTopicsResult.
      */
-    public abstract ListTopicsResults listTopics(ListTopicsOptions options);
+    public abstract ListTopicsResult listTopics(ListTopicsOptions options);
 
     /**
      * Describe some topics in the cluster, with the default options.
@@ -155,9 +158,9 @@ public abstract class AdminClient implements AutoCloseable {
      *
      * @param topicNames        The names of the topics to describe.
      *
-     * @return                  The DescribeTopicsResults.
+     * @return                  The DescribeTopicsResult.
      */
-    public DescribeTopicsResults describeTopics(Collection<String> topicNames) {
+    public DescribeTopicsResult describeTopics(Collection<String> topicNames) {
         return describeTopics(topicNames, new DescribeTopicsOptions());
     }
 
@@ -173,17 +176,17 @@ public abstract class AdminClient implements AutoCloseable {
      * @param topicNames        The names of the topics to describe.
      * @param options           The options to use when describing the topic.
      *
-     * @return                  The DescribeTopicsResults.
+     * @return                  The DescribeTopicsResult.
      */
-    public abstract DescribeTopicsResults describeTopics(Collection<String> topicNames,
+    public abstract DescribeTopicsResult describeTopics(Collection<String> topicNames,
                                                          DescribeTopicsOptions options);
 
     /**
      * Get information about the nodes in the cluster, using the default options.
      *
-     * @return                  The DescribeClusterResults.
+     * @return                  The DescribeClusterResult.
      */
-    public DescribeClusterResults describeCluster() {
+    public DescribeClusterResult describeCluster() {
         return describeCluster(new DescribeClusterOptions());
     }
 
@@ -191,18 +194,18 @@ public abstract class AdminClient implements AutoCloseable {
      * Get information about the nodes in the cluster.
      *
      * @param options           The options to use when getting information about the cluster.
-     * @return                  The DescribeClusterResults.
+     * @return                  The DescribeClusterResult.
      */
-    public abstract DescribeClusterResults describeCluster(DescribeClusterOptions options);
+    public abstract DescribeClusterResult describeCluster(DescribeClusterOptions options);
 
     /**
      * Get information about the api versions of nodes in the cluster with the default options.
      * See {@link AdminClient#apiVersions(Collection<Node>, ApiVersionsOptions)}
      *
      * @param nodes             The nodes to get information about, or null to get information about all nodes.
-     * @return                  The ApiVersionsResults.
+     * @return                  The ApiVersionsResult.
      */
-    public ApiVersionsResults apiVersions(Collection<Node> nodes) {
+    public ApiVersionsResult apiVersions(Collection<Node> nodes) {
         return apiVersions(nodes, new ApiVersionsOptions());
     }
 
@@ -211,19 +214,18 @@ public abstract class AdminClient implements AutoCloseable {
      *
      * @param nodes             The nodes to get information about, or null to get information about all nodes.
      * @param options           The options to use when getting api versions of the nodes.
-     * @return                  The ApiVersionsResults.
+     * @return                  The ApiVersionsResult.
      */
-    public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
+    public abstract ApiVersionsResult apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
 
     /**
-<<<<<<< HEAD
      * Similar to #{@link AdminClient#describeAcls(AclBindingFilter, DescribeAclsOptions),
      * but uses the default options.
      *
      * @param filter            The filter to use.
      * @return                  The DeleteAclsResult.
      */
-    public DescribeAclsResults describeAcls(AclBindingFilter filter) {
+    public DescribeAclsResult describeAcls(AclBindingFilter filter) {
         return describeAcls(filter, new DescribeAclsOptions());
     }
 
@@ -237,7 +239,7 @@ public abstract class AdminClient implements AutoCloseable {
      * @param options           The options to use when listing the ACLs.
      * @return                  The DeleteAclsResult.
      */
-    public abstract DescribeAclsResults describeAcls(AclBindingFilter filter, DescribeAclsOptions options);
+    public abstract DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);
 
     /**
      * Similar to #{@link AdminClient#createAcls(Collection<AclBinding>, CreateAclsOptions),
@@ -246,7 +248,7 @@ public abstract class AdminClient implements AutoCloseable {
      * @param acls              The ACLs to create
      * @return                  The CreateAclsResult.
      */
-    public CreateAclsResults createAcls(Collection<AclBinding> acls) {
+    public CreateAclsResult createAcls(Collection<AclBinding> acls) {
         return createAcls(acls, new CreateAclsOptions());
     }
 
@@ -260,7 +262,7 @@ public abstract class AdminClient implements AutoCloseable {
      * @param options           The options to use when creating the ACLs.
      * @return                  The CreateAclsResult.
      */
-    public abstract CreateAclsResults createAcls(Collection<AclBinding> acls, CreateAclsOptions options);
+    public abstract CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);
 
     /**
      * Similar to #{@link AdminClient#deleteAcls(Collection<AclBinding>, DeleteAclsOptions),
@@ -269,7 +271,7 @@ public abstract class AdminClient implements AutoCloseable {
      * @param filters           The filters to use.
      * @return                  The DeleteAclsResult.
      */
-    public DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters) {
+    public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters) {
         return deleteAcls(filters, new DeleteAclsOptions());
     }
 
@@ -280,7 +282,7 @@ public abstract class AdminClient implements AutoCloseable {
      * @param options           The options to use when deleting the ACLs.
      * @return                  The DeleteAclsResult.
      */
-    public abstract DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);
+    public abstract DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);
 
 
      /**
@@ -289,9 +291,9 @@ public abstract class AdminClient implements AutoCloseable {
      * See {@link #describeConfigs(Collection, DescribeConfigsOptions)} for more details.
      *
      * @param resources         The resources (topic and broker resource types are currently supported)
-     * @return                  The DescribeConfigsResults
+     * @return                  The DescribeConfigsResult
      */
-    public DescribeConfigsResults describeConfigs(Collection<ConfigResource> resources) {
+    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
         return describeConfigs(resources, new DescribeConfigsOptions());
     }
 
@@ -308,9 +310,9 @@ public abstract class AdminClient implements AutoCloseable {
      *
      * @param resources         The resources (topic and broker resource types are currently supported)
      * @param options           The options to use when describing configs
-     * @return                  The DescribeConfigsResults
+     * @return                  The DescribeConfigsResult
      */
-    public abstract DescribeConfigsResults describeConfigs(Collection<ConfigResource> resources,
+    public abstract DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources,
                                                            DescribeConfigsOptions options);
 
     /**
@@ -320,9 +322,9 @@ public abstract class AdminClient implements AutoCloseable {
      *
      * @param configs         The resources with their configs (topic is the only resource type with configs that can
      *                        be updated currently)
-     * @return                The AlterConfigsResults
+     * @return                The AlterConfigsResult
      */
-    public AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs) {
+    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
         return alterConfigs(configs, new AlterConfigsOptions());
     }
 
@@ -335,8 +337,8 @@ public abstract class AdminClient implements AutoCloseable {
      * @param configs         The resources with their configs (topic is the only resource type with configs that can
      *                        be updated currently)
      * @param options         The options to use when describing configs
-     * @return                The AlterConfigsResults
+     * @return                The AlterConfigsResult
      */
-    public abstract AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
+    public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
new file mode 100644
index 0000000..19d7946
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ConfigResource;
+
+import java.util.Map;
+
+@InterfaceStability.Unstable
+public class AlterConfigsResult {
+
+    private final Map<ConfigResource, KafkaFuture<Void>> futures;
+
+    AlterConfigsResult(Map<ConfigResource, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    public Map<ConfigResource, KafkaFuture<Void>> results() {
+        return futures;
+    }
+
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java
deleted file mode 100644
index 3f44cfd..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResults.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Map;
-
-@InterfaceStability.Unstable
-public class AlterConfigsResults {
-
-    private final Map<ConfigResource, KafkaFuture<Void>> futures;
-
-    AlterConfigsResults(Map<ConfigResource, KafkaFuture<Void>> futures) {
-        this.futures = futures;
-    }
-
-    public Map<ConfigResource, KafkaFuture<Void>> results() {
-        return futures;
-    }
-
-    public KafkaFuture<Void> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResult.java
new file mode 100644
index 0000000..62b6d7f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResult.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Results of the apiVersions call.
+ */
+@InterfaceStability.Unstable
+public class ApiVersionsResult {
+    private final Map<Node, KafkaFuture<NodeApiVersions>> futures;
+
+    ApiVersionsResult(Map<Node, KafkaFuture<NodeApiVersions>> futures) {
+        this.futures = futures;
+    }
+
+    public Map<Node, KafkaFuture<NodeApiVersions>> results() {
+        return futures;
+    }
+
+    public KafkaFuture<Map<Node, NodeApiVersions>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<Node, NodeApiVersions>>() {
+                @Override
+                public Map<Node, NodeApiVersions> apply(Void v) {
+                    Map<Node, NodeApiVersions> versions = new HashMap<>(futures.size());
+                    for (Map.Entry<Node, KafkaFuture<NodeApiVersions>> entry : futures.entrySet()) {
+                        try {
+                            versions.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures
+                            // completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return versions;
+                }
+            });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
deleted file mode 100644
index 456c64d..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Results of the apiVersions call.
- */
-@InterfaceStability.Unstable
-public class ApiVersionsResults {
-    private final Map<Node, KafkaFuture<NodeApiVersions>> futures;
-
-    ApiVersionsResults(Map<Node, KafkaFuture<NodeApiVersions>> futures) {
-        this.futures = futures;
-    }
-
-    public Map<Node, KafkaFuture<NodeApiVersions>> results() {
-        return futures;
-    }
-
-    public KafkaFuture<Map<Node, NodeApiVersions>> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.Function<Void, Map<Node, NodeApiVersions>>() {
-                @Override
-                public Map<Node, NodeApiVersions> apply(Void v) {
-                    Map<Node, NodeApiVersions> versions = new HashMap<>(futures.size());
-                    for (Map.Entry<Node, KafkaFuture<NodeApiVersions>> entry : futures.entrySet()) {
-                        try {
-                            versions.put(entry.getKey(), entry.getValue().get());
-                        } catch (InterruptedException | ExecutionException e) {
-                            // This should be unreachable, because allOf ensured that all the futures
-                            // completed successfully.
-                            throw new RuntimeException(e);
-                        }
-                    }
-                    return versions;
-                }
-            });
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java
deleted file mode 100644
index 61af4a8..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigResource.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-public final class ConfigResource {
-
-    public enum Type {
-        BROKER, TOPIC, UNKNOWN;
-    }
-
-    private final Type type;
-    private final String name;
-
-    public ConfigResource(Type type, String name) {
-        this.type = type;
-        this.name = name;
-    }
-
-    public Type type() {
-        return type;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        ConfigResource that = (ConfigResource) o;
-
-        return type == that.type && name.equals(that.name);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = type.hashCode();
-        result = 31 * result + name.hashCode();
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "ConfigResource{type=" + type + ", name='" + name + "'}";
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
new file mode 100644
index 0000000..de83509
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.acl.AclBinding;
+
+import java.util.Map;
+
+/**
+ * The result of the createAcls call.
+ */
+public class CreateAclsResult {
+    private final Map<AclBinding, KafkaFuture<Void>> futures;
+
+    CreateAclsResult(Map<AclBinding, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures which can be used to check the status of
+     * individual deletions.
+     */
+    public Map<AclBinding, KafkaFuture<Void>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the topic deletions succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java
deleted file mode 100644
index 6908037..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResults.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-
-import java.util.Map;
-
-/**
- * The result of the createAcls call.
- */
-public class CreateAclsResults {
-    private final Map<AclBinding, KafkaFuture<Void>> futures;
-
-    CreateAclsResults(Map<AclBinding, KafkaFuture<Void>> futures) {
-        this.futures = futures;
-    }
-
-    /**
-     * Return a map from topic names to futures which can be used to check the status of
-     * individual deletions.
-     */
-    public Map<AclBinding, KafkaFuture<Void>> results() {
-        return futures;
-    }
-
-    /**
-     * Return a future which succeeds only if all the topic deletions succeed.
-     */
-    public KafkaFuture<Void> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
deleted file mode 100644
index 03da7d0..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Map;
-
-/**
- * The result of newTopics.
- */
-@InterfaceStability.Unstable
-public class CreateTopicResults {
-    private final Map<String, KafkaFuture<Void>> futures;
-
-    CreateTopicResults(Map<String, KafkaFuture<Void>> futures) {
-        this.futures = futures;
-    }
-
-    /**
-     * Return a map from topic names to futures, which can be used to check the status of individual
-     * topic creations.
-     */
-    public Map<String, KafkaFuture<Void>> results() {
-        return futures;
-    }
-
-    /**
-     * Return a future which succeeds if all the topic creations succeed.
-     */
-    public KafkaFuture<Void> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
new file mode 100644
index 0000000..49bf21c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of newTopics.
+ */
+@InterfaceStability.Unstable
+public class CreateTopicsResult {
+    private final Map<String, KafkaFuture<Void>> futures;
+
+    CreateTopicsResult(Map<String, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures, which can be used to check the status of individual
+     * topic creations.
+     */
+    public Map<String, KafkaFuture<Void>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds if all the topic creations succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
new file mode 100644
index 0000000..da92752
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The result of the deleteAcls call.
+ */
+public class DeleteAclsResult {
+    public static class FilterResult {
+        private final AclBinding acl;
+        private final ApiException exception;
+
+        FilterResult(AclBinding acl, ApiException exception) {
+            this.acl = acl;
+            this.exception = exception;
+        }
+
+        public AclBinding acl() {
+            return acl;
+        }
+
+        public ApiException exception() {
+            return exception;
+        }
+    }
+
+    public static class FilterResults {
+        private final List<FilterResult> acls;
+
+        FilterResults(List<FilterResult> acls) {
+            this.acls = acls;
+        }
+
+        public List<FilterResult> acls() {
+            return acls;
+        }
+    }
+
+    private final Map<AclBindingFilter, KafkaFuture<FilterResults>> futures;
+
+    DeleteAclsResult(Map<AclBindingFilter, KafkaFuture<FilterResults>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures which can be used to check the status of
+     * individual deletions.
+     */
+    public Map<AclBindingFilter, KafkaFuture<FilterResults>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the ACLs deletions succeed, and which contains all the deleted ACLs.
+     * Note that it if the filters don't match any ACLs, this is not considered an error.
+     */
+    public KafkaFuture<Collection<AclBinding>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
+            new KafkaFuture.Function<Void, Collection<AclBinding>>() {
+                @Override
+                public Collection<AclBinding> apply(Void v) {
+                    List<AclBinding> acls = new ArrayList<>();
+                    for (Map.Entry<AclBindingFilter, KafkaFuture<FilterResults>> entry : futures.entrySet()) {
+                        FilterResults results;
+                        try {
+                            results = entry.getValue().get();
+                        } catch (Throwable e) {
+                            // This should be unreachable, since the future returned by KafkaFuture#allOf should
+                            // have failed if any Future failed.
+                            throw new KafkaException("DeleteAclsResult#all: internal error", e);
+                        }
+                        for (FilterResult result : results.acls()) {
+                            if (result.exception() != null) {
+                                throw result.exception();
+                            }
+                            acls.add(result.acl());
+                        }
+                    }
+                    return acls;
+                }
+            });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java
deleted file mode 100644
index dfb2e6b..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResults.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.errors.ApiException;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The result of the deleteAcls call.
- */
-public class DeleteAclsResults {
-    public static class FilterResult {
-        private final AclBinding acl;
-        private final ApiException exception;
-
-        FilterResult(AclBinding acl, ApiException exception) {
-            this.acl = acl;
-            this.exception = exception;
-        }
-
-        public AclBinding acl() {
-            return acl;
-        }
-
-        public ApiException exception() {
-            return exception;
-        }
-    }
-
-    public static class FilterResults {
-        private final List<FilterResult> acls;
-
-        FilterResults(List<FilterResult> acls) {
-            this.acls = acls;
-        }
-
-        public List<FilterResult> acls() {
-            return acls;
-        }
-    }
-
-    private final Map<AclBindingFilter, KafkaFuture<FilterResults>> futures;
-
-    DeleteAclsResults(Map<AclBindingFilter, KafkaFuture<FilterResults>> futures) {
-        this.futures = futures;
-    }
-
-    /**
-     * Return a map from topic names to futures which can be used to check the status of
-     * individual deletions.
-     */
-    public Map<AclBindingFilter, KafkaFuture<FilterResults>> results() {
-        return futures;
-    }
-
-    /**
-     * Return a future which succeeds only if all the ACLs deletions succeed, and which contains all the deleted ACLs.
-     * Note that it if the filters don't match any ACLs, this is not considered an error.
-     */
-    public KafkaFuture<Collection<AclBinding>> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
-            new KafkaFuture.Function<Void, Collection<AclBinding>>() {
-                @Override
-                public Collection<AclBinding> apply(Void v) {
-                    List<AclBinding> acls = new ArrayList<>();
-                    for (Map.Entry<AclBindingFilter, KafkaFuture<FilterResults>> entry : futures.entrySet()) {
-                        FilterResults results;
-                        try {
-                            results = entry.getValue().get();
-                        } catch (Throwable e) {
-                            // This should be unreachable, since the future returned by KafkaFuture#allOf should
-                            // have failed if any Future failed.
-                            throw new KafkaException("DeleteAclsResults#all: internal error", e);
-                        }
-                        for (FilterResult result : results.acls()) {
-                            if (result.exception() != null) {
-                                throw result.exception();
-                            }
-                            acls.add(result.acl());
-                        }
-                    }
-                    return acls;
-                }
-            });
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
deleted file mode 100644
index 3dd4889..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Map;
-
-/**
- * The result of the deleteTopics call.
- */
-@InterfaceStability.Unstable
-public class DeleteTopicResults {
-    final Map<String, KafkaFuture<Void>> futures;
-
-    DeleteTopicResults(Map<String, KafkaFuture<Void>> futures) {
-        this.futures = futures;
-    }
-
-    /**
-     * Return a map from topic names to futures which can be used to check the status of
-     * individual deletions.
-     */
-    public Map<String, KafkaFuture<Void>> results() {
-        return futures;
-    }
-
-    /**
-     * Return a future which succeeds only if all the topic deletions succeed.
-     */
-    public KafkaFuture<Void> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
new file mode 100644
index 0000000..169ee96
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of the deleteTopics call.
+ */
+@InterfaceStability.Unstable
+public class DeleteTopicsResult {
+    final Map<String, KafkaFuture<Void>> futures;
+
+    DeleteTopicsResult(Map<String, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures which can be used to check the status of
+     * individual deletions.
+     */
+    public Map<String, KafkaFuture<Void>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the topic deletions succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
new file mode 100644
index 0000000..6d65da6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.acl.AclBinding;
+
+import java.util.Collection;
+
+/**
+ * The result of the describeAcls call.
+ */
+public class DescribeAclsResult {
+    private final KafkaFuture<Collection<AclBinding>> future;
+
+    DescribeAclsResult(KafkaFuture<Collection<AclBinding>> future) {
+        this.future = future;
+    }
+
+    public KafkaFuture<Collection<AclBinding>> all() {
+        return future;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java
deleted file mode 100644
index dea98ab..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResults.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-
-import java.util.Collection;
-
-/**
- * The result of the describeAcls call.
- */
-public class DescribeAclsResults {
-    private final KafkaFuture<Collection<AclBinding>> future;
-
-    DescribeAclsResults(KafkaFuture<Collection<AclBinding>> future) {
-        this.future = future;
-    }
-
-    public KafkaFuture<Collection<AclBinding>> all() {
-        return future;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java
new file mode 100644
index 0000000..34be2f4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResult.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * The results of the describeCluster call.
+ */
+@InterfaceStability.Unstable
+public class DescribeClusterResult {
+    private final KafkaFuture<Collection<Node>> nodes;
+    private final KafkaFuture<Node> controller;
+    private final KafkaFuture<String> clusterId;
+
+    DescribeClusterResult(KafkaFuture<Collection<Node>> nodes,
+                           KafkaFuture<Node> controller,
+                           KafkaFuture<String> clusterId) {
+        this.nodes = nodes;
+        this.controller = controller;
+        this.clusterId = clusterId;
+    }
+
+    /**
+     * Returns a future which yields a collection of nodes.
+     */
+    public KafkaFuture<Collection<Node>> nodes() {
+        return nodes;
+    }
+
+    /**
+     * Returns a future which yields the current controller id.
+     * Note that this may yield null, if the controller ID is not yet known.
+     */
+    public KafkaFuture<Node> controller() {
+        return controller;
+    }
+
+    /**
+     * Returns a future which yields the current cluster Id.
+     * Note that this may yield null, if the cluster version is too old.
+     */
+    public KafkaFuture<String> clusterId() {
+        return clusterId;
+    }
+}