You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/15 01:05:54 UTC

kafka git commit: KAFKA-5275; AdminClient API consistency

Repository: kafka
Updated Branches:
  refs/heads/trunk bcf5da0ba -> 0f60617fa


KAFKA-5275; AdminClient API consistency

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Colin P. Mccabe <cm...@confluent.io>, Jason Gustafson <ja...@confluent.io>

Closes #3339 from ijuma/kafka-5275-admin-client-api-consistency


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0f60617f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0f60617f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0f60617f

Branch: refs/heads/trunk
Commit: 0f60617fab5fc6805f522d0b9a213a7f600fab12
Parents: bcf5da0
Author: Ismael Juma <is...@juma.me.uk>
Authored: Thu Jun 15 02:05:41 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Jun 15 02:05:41 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/admin/AdminClientConfig.java  |  9 +++
 .../clients/admin/AlterConfigsOptions.java      |  2 +-
 .../kafka/clients/admin/AlterConfigsResult.java |  2 +-
 .../kafka/clients/admin/CreateAclsResult.java   |  2 +-
 .../clients/admin/CreateTopicsOptions.java      |  2 +-
 .../kafka/clients/admin/CreateTopicsResult.java |  2 +-
 .../kafka/clients/admin/DeleteAclsResult.java   | 32 +++++------
 .../kafka/clients/admin/DeleteTopicsResult.java |  2 +-
 .../kafka/clients/admin/DescribeAclsResult.java |  2 +-
 .../clients/admin/DescribeConfigsResult.java    |  2 +-
 .../clients/admin/DescribeTopicsResult.java     |  2 +-
 .../kafka/clients/admin/KafkaAdminClient.java   | 60 ++++++++++----------
 .../kafka/clients/admin/ListTopicsOptions.java  |  2 +-
 .../kafka/clients/admin/ListTopicsResult.java   | 13 +++--
 .../apache/kafka/clients/admin/NewTopic.java    |  2 +-
 .../kafka/clients/admin/TopicDescription.java   | 19 ++++---
 .../kafka/clients/admin/TopicListing.java       |  2 +-
 .../apache/kafka/common/TopicPartitionInfo.java |  7 ++-
 .../kafka/common/acl/AccessControlEntry.java    | 10 ++--
 .../common/acl/AccessControlEntryData.java      |  4 +-
 .../common/acl/AccessControlEntryFilter.java    |  4 +-
 .../org/apache/kafka/common/acl/AclBinding.java |  2 +-
 .../kafka/common/acl/AclBindingFilter.java      |  2 +-
 .../apache/kafka/common/acl/AclOperation.java   |  2 +-
 .../kafka/common/acl/AclPermissionType.java     |  2 +-
 .../apache/kafka/common/resource/Resource.java  |  4 +-
 .../kafka/common/resource/ResourceFilter.java   |  2 +-
 .../kafka/common/resource/ResourceType.java     |  2 +-
 .../clients/admin/KafkaAdminClientTest.java     | 28 ++++-----
 .../kafka/common/acl/AclOperationTest.java      |  2 +-
 .../kafka/common/acl/AclPermissionTypeTest.java |  2 +-
 .../kafka/common/resource/ResourceTypeTest.java |  2 +-
 .../apache/kafka/connect/util/TopicAdmin.java   |  2 +-
 .../storage/KafkaConfigBackingStoreTest.java    |  2 +-
 .../storage/KafkaOffsetBackingStoreTest.java    |  2 +-
 .../kafka/api/AdminClientIntegrationTest.scala  | 30 +++++-----
 ...AdminClientWithPoliciesIntegrationTest.scala | 20 +++----
 .../api/SaslSslAdminClientIntegrationTest.scala | 38 ++++++-------
 .../kafka/tools/ClientCompatibilityTest.java    |  6 +-
 39 files changed, 173 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index c106823..ed51e67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -98,6 +98,9 @@ public class AdminClientConfig extends AbstractConfig {
     private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
     private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
 
+    public static final String RETRIES_CONFIG = "retries";
+    private static final String RETRIES_DOC = "The maximum number of times to retry a call before failing it.";
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
@@ -136,6 +139,12 @@ public class AdminClientConfig extends AbstractConfig {
                                         5 * 60 * 1000,
                                         Importance.MEDIUM,
                                         CONNECTIONS_MAX_IDLE_MS_DOC)
+                                .define(RETRIES_CONFIG,
+                                        Type.INT,
+                                        5,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        RETRIES_DOC)
                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
                                         Type.LONG,
                                         30000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
index 31f130f..c5665c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java
@@ -52,7 +52,7 @@ public class AlterConfigsOptions {
     /**
      * Return true if the request should be validated without altering the configs.
      */
-    public boolean isValidateOnly() {
+    public boolean shouldValidateOnly() {
         return validateOnly;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
index 5baacf7..df6c1c2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java
@@ -40,7 +40,7 @@ public class AlterConfigsResult {
     /**
      * Return a map from resources to futures which can be used to check the status of the operation on each resource.
      */
-    public Map<ConfigResource, KafkaFuture<Void>> results() {
+    public Map<ConfigResource, KafkaFuture<Void>> values() {
         return futures;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
index e575184..2917f17 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java
@@ -41,7 +41,7 @@ public class CreateAclsResult {
      * Return a map from ACL bindings to futures which can be used to check the status of the creation of each ACL
      * binding.
      */
-    public Map<AclBinding, KafkaFuture<Void>> results() {
+    public Map<AclBinding, KafkaFuture<Void>> values() {
         return futures;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
index bc24014..cb23a8d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
@@ -59,7 +59,7 @@ public class CreateTopicsOptions {
     /**
      * Return true if the request should be validated without creating the topic.
      */
-    public boolean validateOnly() {
+    public boolean shouldValidateOnly() {
         return validateOnly;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
index 3731fad..404cb918 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
@@ -39,7 +39,7 @@ public class CreateTopicsResult {
      * Return a map from topic names to futures, which can be used to check the status of individual
      * topic creations.
      */
-    public Map<String, KafkaFuture<Void>> results() {
+    public Map<String, KafkaFuture<Void>> values() {
         return futures;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
index 16a505d..90bc297 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java
@@ -38,22 +38,22 @@ import java.util.Map;
 public class DeleteAclsResult {
 
     /**
-     * A class containing either the deleted ACL or an exception if the delete failed.
+     * A class containing either the deleted ACL binding or an exception if the delete failed.
      */
     public static class FilterResult {
-        private final AclBinding acl;
+        private final AclBinding binding;
         private final ApiException exception;
 
-        FilterResult(AclBinding acl, ApiException exception) {
-            this.acl = acl;
+        FilterResult(AclBinding binding, ApiException exception) {
+            this.binding = binding;
             this.exception = exception;
         }
 
         /**
-         * Return the deleted ACL or null if there was an error.
+         * Return the deleted ACL binding or null if there was an error.
          */
-        public AclBinding acl() {
-            return acl;
+        public AclBinding binding() {
+            return binding;
         }
 
         /**
@@ -68,17 +68,17 @@ public class DeleteAclsResult {
      * A class containing the results of the delete ACLs operation.
      */
     public static class FilterResults {
-        private final List<FilterResult> acls;
+        private final List<FilterResult> values;
 
-        FilterResults(List<FilterResult> acls) {
-            this.acls = acls;
+        FilterResults(List<FilterResult> values) {
+            this.values = values;
         }
 
         /**
-         * Return a list of delete ACLs results.
+         * Return a list of delete ACLs results for a given filter.
          */
-        public List<FilterResult> acls() {
-            return acls;
+        public List<FilterResult> values() {
+            return values;
         }
     }
 
@@ -92,7 +92,7 @@ public class DeleteAclsResult {
      * Return a map from acl filters to futures which can be used to check the status of the deletions by each
      * filter.
      */
-    public Map<AclBindingFilter, KafkaFuture<FilterResults>> results() {
+    public Map<AclBindingFilter, KafkaFuture<FilterResults>> values() {
         return futures;
     }
 
@@ -115,11 +115,11 @@ public class DeleteAclsResult {
                             // have failed if any Future failed.
                             throw new KafkaException("DeleteAclsResult#all: internal error", e);
                         }
-                        for (FilterResult result : results.acls()) {
+                        for (FilterResult result : results.values()) {
                             if (result.exception() != null) {
                                 throw result.exception();
                             }
-                            acls.add(result.acl());
+                            acls.add(result.binding());
                         }
                     }
                     return acls;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
index 2fd6648..9148a76 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
@@ -40,7 +40,7 @@ public class DeleteTopicsResult {
      * Return a map from topic names to futures which can be used to check the status of
      * individual deletions.
      */
-    public Map<String, KafkaFuture<Void>> results() {
+    public Map<String, KafkaFuture<Void>> values() {
         return futures;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
index 7c49b28..e09bf43 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java
@@ -40,7 +40,7 @@ public class DescribeAclsResult {
     /**
      * Return a future containing the ACLs requested.
      */
-    public KafkaFuture<Collection<AclBinding>> all() {
+    public KafkaFuture<Collection<AclBinding>> values() {
         return future;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
index c5d4c26..478bf05 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
@@ -44,7 +44,7 @@ public class DescribeConfigsResult {
      * Return a map from resources to futures which can be used to check the status of the configuration for each
      * resource.
      */
-    public Map<ConfigResource, KafkaFuture<Config>> results() {
+    public Map<ConfigResource, KafkaFuture<Config>> values() {
         return futures;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 4e8d433..18f5f9d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -42,7 +42,7 @@ public class DescribeTopicsResult {
      * Return a map from topic names to futures which can be used to check the status of
      * individual topics.
      */
-    public Map<String, KafkaFuture<TopicDescription>> results() {
+    public Map<String, KafkaFuture<TopicDescription>> values() {
         return futures;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d2927ea..e92b1d3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -89,13 +89,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -113,11 +113,6 @@ public class KafkaAdminClient extends AdminClient {
     private static final Logger log = LoggerFactory.getLogger(KafkaAdminClient.class);
 
     /**
-     * The maximum number of times to retry a call before failing it.
-     */
-    private static final int MAX_CALL_RETRIES = 5;
-
-    /**
      * The next integer to use to name a KafkaAdminClient which the user hasn't specified an explicit name for.
      */
     private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
@@ -183,6 +178,8 @@ public class KafkaAdminClient extends AdminClient {
      */
     private final TimeoutProcessorFactory timeoutProcessorFactory;
 
+    private final int maxRetries;
+
     /**
      * Get or create a list value from a map.
      *
@@ -357,6 +354,7 @@ public class KafkaAdminClient extends AdminClient {
         this.thread = new KafkaThread(threadName, runnable, false);
         this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
+        this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
         config.logUnused();
         log.debug("Created Kafka admin client {}", this.clientId);
         thread.start();
@@ -421,22 +419,6 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     /**
-     * Provides a constant node which is known at construction time.
-     */
-    private static class ConstantNodeProvider implements NodeProvider {
-        private final Node node;
-
-        ConstantNodeProvider(Node node) {
-            this.node = node;
-        }
-
-        @Override
-        public Node provide() {
-            return node;
-        }
-    }
-
-    /**
      * Provides the controller node.
      */
     private class ControllerNodeProvider implements NodeProvider {
@@ -506,7 +488,7 @@ public class KafkaAdminClient extends AdminClient {
                 return;
             }
             // If we are out of retries, fail.
-            if (tries > MAX_CALL_RETRIES) {
+            if (tries > maxRetries) {
                 if (log.isDebugEnabled()) {
                     log.debug("{} failed after {} attempt(s)", this, tries,
                         new Exception(prettyPrintException(throwable)));
@@ -1031,7 +1013,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             public AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.validateOnly());
+                return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.shouldValidateOnly());
             }
 
             @Override
@@ -1141,7 +1123,7 @@ public class KafkaAdminClient extends AdminClient {
                 Map<String, TopicListing> topicListing = new HashMap<>();
                 for (String topicName : cluster.topics()) {
                     boolean internal = cluster.internalTopics().contains(topicName);
-                    if (!internal || options.listInternal())
+                    if (!internal || options.shouldListInternal())
                         topicListing.put(topicName, new TopicListing(topicName, internal));
                 }
                 topicListingFuture.complete(topicListing);
@@ -1197,19 +1179,31 @@ public class KafkaAdminClient extends AdminClient {
                         continue;
                     }
                     boolean isInternal = cluster.internalTopics().contains(topicName);
-                    TreeMap<Integer, TopicPartitionInfo> partitions = new TreeMap<>();
                     List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName);
+                    List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size());
                     for (PartitionInfo partitionInfo : partitionInfos) {
                         TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
-                            partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()),
+                            partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
                             Arrays.asList(partitionInfo.inSyncReplicas()));
-                        partitions.put(partitionInfo.partition(), topicPartitionInfo);
+                        partitions.add(topicPartitionInfo);
                     }
+                    Collections.sort(partitions, new Comparator<TopicPartitionInfo>() {
+                        @Override
+                        public int compare(TopicPartitionInfo tp1, TopicPartitionInfo tp2) {
+                            return Integer.compare(tp1.partition(), tp2.partition());
+                        }
+                    });
                     TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions);
                     future.complete(topicDescription);
                 }
             }
 
+            private Node leader(PartitionInfo partitionInfo) {
+                if (partitionInfo.leader() == null || partitionInfo.leader().id() == Node.noNode().id())
+                    return null;
+                return partitionInfo.leader();
+            }
+
             @Override
             boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
                 if (supportsDisablingTopicCreation) {
@@ -1247,10 +1241,16 @@ public class KafkaAdminClient extends AdminClient {
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse response = (MetadataResponse) abstractResponse;
                 describeClusterFuture.complete(response.brokers());
-                controllerFuture.complete(response.controller());
+                controllerFuture.complete(controller(response));
                 clusterIdFuture.complete(response.clusterId());
             }
 
+            private Node controller(MetadataResponse response) {
+                if (response.controller() == null || response.controller().id() == MetadataResponse.NO_CONTROLLER_ID)
+                    return null;
+                return response.controller();
+            }
+
             @Override
             void handleFailure(Throwable throwable) {
                 describeClusterFuture.completeExceptionally(throwable);
@@ -1530,7 +1530,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             public AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new AlterConfigsRequest.Builder(requestMap, options.isValidateOnly());
+                return new AlterConfigsRequest.Builder(requestMap, options.shouldValidateOnly());
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
index 4068e88..81d834f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -61,7 +61,7 @@ public class ListTopicsOptions {
     /**
      * Return true if we should list internal topics.
      */
-    public boolean listInternal() {
+    public boolean shouldListInternal() {
         return listInternal;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
index 97987ae..e54b3de 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * The result of the {@link AdminClient#listTopics()} call.
@@ -39,14 +40,14 @@ public class ListTopicsResult {
     /**
      * Return a future which yields a map of topic names to TopicListing objects.
      */
-    public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() {
+    public KafkaFuture<Map<String, TopicListing>> namesToListings() {
         return future;
     }
 
     /**
      * Return a future which yields a collection of TopicListing objects.
      */
-    public KafkaFuture<Collection<TopicListing>> descriptions() {
+    public KafkaFuture<Collection<TopicListing>> listings() {
         return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
             @Override
             public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
@@ -58,11 +59,11 @@ public class ListTopicsResult {
     /**
      * Return a future which yields a collection of topic names.
      */
-    public KafkaFuture<Collection<String>> names() {
-        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() {
+    public KafkaFuture<Set<String>> names() {
+        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Set<String>>() {
             @Override
-            public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) {
-                return namesToDescriptions.keySet();
+            public Set<String> apply(Map<String, TopicListing> namesToListings) {
+                return namesToListings.keySet();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
index 314337c..ff09579 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
@@ -68,7 +68,7 @@ public class NewTopic {
     /**
      * The number of partitions for the new topic or -1 if a replica assignment has been specified.
      */
-    public int partitions() {
+    public int numPartitions() {
         return numPartitions;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
index 88ec1a0..c220892 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -20,7 +20,7 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.utils.Utils;
 
-import java.util.NavigableMap;
+import java.util.List;
 
 /**
  * A detailed description of a single topic in the cluster.
@@ -28,17 +28,17 @@ import java.util.NavigableMap;
 public class TopicDescription {
     private final String name;
     private final boolean internal;
-    private final NavigableMap<Integer, TopicPartitionInfo> partitions;
+    private final List<TopicPartitionInfo> partitions;
 
     /**
      * Create an instance with the specified parameters.
      *
      * @param name The topic name
      * @param internal Whether the topic is internal to Kafka
-     * @param partitions A map from partition id to its leadership and replica information
+     * @param partitions A list of partitions where the index represents the partition id and the element contains
+     *                   leadership and replica information for that partition.
      */
-    public TopicDescription(String name, boolean internal,
-                    NavigableMap<Integer, TopicPartitionInfo> partitions) {
+    public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions) {
         this.name = name;
         this.internal = internal;
         this.partitions = partitions;
@@ -55,20 +55,21 @@ public class TopicDescription {
      * Whether the topic is internal to Kafka. An example of an internal topic is the offsets and group management topic:
      * __consumer_offsets.
      */
-    public boolean internal() {
+    public boolean isInternal() {
         return internal;
     }
 
     /**
-     * A map from partition id to the leadership and replica information for that partition.
+     * A list of partitions where the index represents the partition id and the element contains leadership and replica
+     * information for that partition.
      */
-    public NavigableMap<Integer, TopicPartitionInfo> partitions() {
+    public List<TopicPartitionInfo> partitions() {
         return partitions;
     }
 
     @Override
     public String toString() {
         return "(name=" + name + ", internal=" + internal + ", partitions=" +
-            Utils.mkString(partitions, "[", "]", "=", ",") + ")";
+            Utils.join(partitions, ",") + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
index 738c2ef..e5124be 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
@@ -46,7 +46,7 @@ public class TopicListing {
      * Whether the topic is internal to Kafka. An example of an internal topic is the offsets and group management topic:
      * __consumer_offsets.
      */
-    public boolean internal() {
+    public boolean isInternal() {
         return internal;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
index 7656cd2..be69318 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common;
 
 import org.apache.kafka.common.utils.Utils;
 
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -42,8 +43,8 @@ public class TopicPartitionInfo {
     public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
         this.partition = partition;
         this.leader = leader;
-        this.replicas = replicas;
-        this.isr = isr;
+        this.replicas = Collections.unmodifiableList(replicas);
+        this.isr = Collections.unmodifiableList(isr);
     }
 
     /**
@@ -54,7 +55,7 @@ public class TopicPartitionInfo {
     }
 
     /**
-     * Return the leader of the partition or {@link Node#noNode()} if there is none.
+     * Return the leader of the partition or null if there is none.
      */
     public Node leader() {
         return leader;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
index 1796762..d5e05df 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
@@ -42,9 +42,11 @@ public class AccessControlEntry {
         Objects.requireNonNull(principal);
         Objects.requireNonNull(host);
         Objects.requireNonNull(operation);
-        assert operation != AclOperation.ANY;
+        if (operation == AclOperation.ANY)
+            throw new IllegalArgumentException("operation must not be ANY");
         Objects.requireNonNull(permissionType);
-        assert permissionType != AclPermissionType.ANY;
+        if (permissionType == AclPermissionType.ANY)
+            throw new IllegalArgumentException("permissionType must not be ANY");
         this.data = new AccessControlEntryData(principal, host, operation, permissionType);
     }
 
@@ -91,8 +93,8 @@ public class AccessControlEntry {
     /**
      * Return true if this AclResource has any UNKNOWN components.
      */
-    public boolean unknown() {
-        return data.unknown();
+    public boolean isUnknown() {
+        return data.isUnknown();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
index cf69263..ad7660d 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
@@ -83,8 +83,8 @@ class AccessControlEntryData {
     /**
      * Return true if there are any UNKNOWN components.
      */
-    boolean unknown() {
-        return operation.unknown() || permissionType.unknown();
+    boolean isUnknown() {
+        return operation.isUnknown() || permissionType.isUnknown();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
index 2d8e9fb..a95303e 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
@@ -95,8 +95,8 @@ public class AccessControlEntryFilter {
     /**
      * Return true if there are any UNKNOWN components.
      */
-    public boolean unknown() {
-        return data.unknown();
+    public boolean isUnknown() {
+        return data.isUnknown();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
index fd2a756..ea58434 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
@@ -49,7 +49,7 @@ public class AclBinding {
      * Return true if this binding has any UNKNOWN components.
      */
     public boolean unknown() {
-        return resource.unknown() || entry.unknown();
+        return resource.isUnknown() || entry.isUnknown();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
index ad4a811..807b730 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
@@ -57,7 +57,7 @@ public class AclBindingFilter {
      * Return true if this filter has any UNKNOWN components.
      */
     public boolean unknown() {
-        return resourceFilter.unknown() || entryFilter.unknown();
+        return resourceFilter.unknown() || entryFilter.isUnknown();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
index c5d5b1a..3da18cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
@@ -158,7 +158,7 @@ public enum AclOperation {
     /**
      * Return true if this operation is UNKNOWN.
      */
-    public boolean unknown() {
+    public boolean isUnknown() {
         return this == UNKNOWN;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
index d963135..c5b077c 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
@@ -100,7 +100,7 @@ public enum AclPermissionType {
     /**
      * Return true if this permission type is UNKNOWN.
      */
-    public boolean unknown() {
+    public boolean isUnknown() {
         return this == UNKNOWN;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
index 484c207..f41f41a 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
@@ -83,8 +83,8 @@ public class Resource {
     /**
      * Return true if this Resource has any UNKNOWN components.
      */
-    public boolean unknown() {
-        return resourceType.unknown();
+    public boolean isUnknown() {
+        return resourceType.isUnknown();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
index 18042fa..5032660 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
@@ -71,7 +71,7 @@ public class ResourceFilter {
      * Return true if this ResourceFilter has any UNKNOWN components.
      */
     public boolean unknown() {
-        return resourceType.unknown();
+        return resourceType.isUnknown();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
index 0874f91..d83382d 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
@@ -110,7 +110,7 @@ public enum ResourceType {
     /**
      * Return whether this resource type is UNKNOWN.
      */
-    public boolean unknown() {
+    public boolean isUnknown() {
         return this == UNKNOWN;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index dfab018..cd6ed6b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -220,17 +220,17 @@ public class KafkaAdminClientTest {
                         add(ACL1);
                         add(ACL2);
                     }}));
-            assertCollectionIs(env.adminClient().describeAcls(FILTER1).all().get(), ACL1, ACL2);
+            assertCollectionIs(env.adminClient().describeAcls(FILTER1).values().get(), ACL1, ACL2);
 
             // Test a call where we get back no results.
             env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null,
                 Collections.<AclBinding>emptySet()));
-            assertTrue(env.adminClient().describeAcls(FILTER2).all().get().isEmpty());
+            assertTrue(env.adminClient().describeAcls(FILTER2).values().get().isEmpty());
 
             // Test a call where we get back an error.
             env.kafkaClient().prepareResponse(new DescribeAclsResponse(0,
                 new SecurityDisabledException("Security is disabled"), Collections.<AclBinding>emptySet()));
-            assertFutureError(env.adminClient().describeAcls(FILTER2).all(), SecurityDisabledException.class);
+            assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class);
         }
     }
 
@@ -251,8 +251,8 @@ public class KafkaAdminClientTest {
                         add(ACL1);
                         add(ACL2);
                     }});
-            assertCollectionIs(results.results().keySet(), ACL1, ACL2);
-            for (KafkaFuture<Void> future : results.results().values()) {
+            assertCollectionIs(results.values().keySet(), ACL1, ACL2);
+            for (KafkaFuture<Void> future : results.values().values()) {
                 future.get();
             }
             results.all().get();
@@ -267,9 +267,9 @@ public class KafkaAdminClientTest {
                     add(ACL1);
                     add(ACL2);
                 }});
-            assertCollectionIs(results.results().keySet(), ACL1, ACL2);
-            assertFutureError(results.results().get(ACL1), SecurityDisabledException.class);
-            results.results().get(ACL2).get();
+            assertCollectionIs(results.values().keySet(), ACL1, ACL2);
+            assertFutureError(results.values().get(ACL1), SecurityDisabledException.class);
+            results.values().get(ACL2).get();
             assertFutureError(results.all(), SecurityDisabledException.class);
         }
     }
@@ -295,12 +295,12 @@ public class KafkaAdminClientTest {
                         add(FILTER1);
                         add(FILTER2);
                     }});
-            Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.results();
+            Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.values();
             FilterResults filter1Results = filterResults.get(FILTER1).get();
-            assertEquals(null, filter1Results.acls().get(0).exception());
-            assertEquals(ACL1, filter1Results.acls().get(0).acl());
-            assertEquals(null, filter1Results.acls().get(1).exception());
-            assertEquals(ACL2, filter1Results.acls().get(1).acl());
+            assertEquals(null, filter1Results.values().get(0).exception());
+            assertEquals(ACL1, filter1Results.values().get(0).binding());
+            assertEquals(null, filter1Results.values().get(1).exception());
+            assertEquals(ACL2, filter1Results.values().get(1).binding());
             assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class);
             assertFutureError(results.all(), SecurityDisabledException.class);
 
@@ -318,7 +318,7 @@ public class KafkaAdminClientTest {
                             add(FILTER1);
                             add(FILTER2);
                         }});
-            assertTrue(results.results().get(FILTER2).get().acls().isEmpty());
+            assertTrue(results.values().get(FILTER2).get().values().isEmpty());
             assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where there are no errors.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
index ba09499..a9b27d8 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java
@@ -55,7 +55,7 @@ public class AclOperationTest {
     public void testIsUnknown() throws Exception {
         for (AclOperationTestInfo info : INFOS) {
             assertEquals(info.operation + " was supposed to have unknown == " + info.unknown,
-                info.unknown, info.operation.unknown());
+                info.unknown, info.operation.isUnknown());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
index 15b9068..3f018d7 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java
@@ -47,7 +47,7 @@ public class AclPermissionTypeTest {
     public void testIsUnknown() throws Exception {
         for (AclPermissionTypeTestInfo info : INFOS) {
             assertEquals(info.ty + " was supposed to have unknown == " + info.unknown,
-                info.unknown, info.ty.unknown());
+                info.unknown, info.ty.isUnknown());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
index 9adade1..d5f13bc 100644
--- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java
@@ -48,7 +48,7 @@ public class ResourceTypeTest {
     public void testIsUnknown() throws Exception {
         for (AclResourceTypeTestInfo info : INFOS) {
             assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown,
-                info.unknown, info.resourceType.unknown());
+                info.unknown, info.resourceType.isUnknown());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index a3b4218..adc3378 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -216,7 +216,7 @@ public class TopicAdmin implements AutoCloseable {
 
         // Attempt to create any missing topics
         CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false);
-        Map<String, KafkaFuture<Void>> newResults = admin.createTopics(topicsByName.values(), args).results();
+        Map<String, KafkaFuture<Void>> newResults = admin.createTopics(topicsByName.values(), args).values();
 
         // Iterate over each future so that we can handle individual failures like when some topics already exist
         Set<String> newlyCreatedTopicNames = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 9da574d..07d192b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -169,7 +169,7 @@ public class KafkaConfigBackingStoreTest {
         assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
 
         assertEquals(TOPIC, capturedNewTopic.getValue().name());
-        assertEquals(1, capturedNewTopic.getValue().partitions());
+        assertEquals(1, capturedNewTopic.getValue().numPartitions());
         assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());
         configStorage.start();
         configStorage.stop();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index 70d7f40..8cd2f0b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -130,7 +130,7 @@ public class KafkaOffsetBackingStoreTest {
         assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
 
         assertEquals(TOPIC, capturedNewTopic.getValue().name());
-        assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().partitions());
+        assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions());
         assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());
 
         store.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 5e94ffe..4c74bca 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -133,7 +133,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
     client.createTopics(newTopics.asJava).all.get()
     waitForTopics(client, topics, List())
 
-    val results = client.createTopics(newTopics.asJava).results()
+    val results = client.createTopics(newTopics.asJava).values()
     assertTrue(results.containsKey("mytopic"))
     assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
     assertTrue(results.containsKey("mytopic2"))
@@ -143,7 +143,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
     assertEquals(topics.toSet, topicToDescription.keySet.asScala)
 
     val topic0 = topicToDescription.get("mytopic")
-    assertEquals(false, topic0.internal)
+    assertEquals(false, topic0.isInternal)
     assertEquals("mytopic", topic0.name)
     assertEquals(2, topic0.partitions.size)
     val topic0Partition0 = topic0.partitions.get(0)
@@ -158,7 +158,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
     assertEquals(Seq(2, 0), topic0Partition1.replicas.asScala.map(_.id))
 
     val topic1 = topicToDescription.get("mytopic2")
-    assertEquals(false, topic1.internal)
+    assertEquals(false, topic1.isInternal)
     assertEquals("mytopic2", topic1.name)
     assertEquals(3, topic1.partitions.size)
     for (partitionId <- 0 until 3) {
@@ -192,7 +192,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
     waitForTopics(client, Seq(existingTopic), List())
 
     val nonExistingTopic = "non-existing"
-    val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).results
+    val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).values
     assertEquals(existingTopic, results.get(existingTopic).get.name)
     intercept[ExecutionException](results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
     assertEquals(None, zkUtils.getTopicPartitionCount(nonExistingTopic))
@@ -303,7 +303,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
   @Test
   def testAclOperations(): Unit = {
     client = AdminClient.create(createConfig())
-    assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).all(), classOf[SecurityDisabledException])
+    assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).values(), classOf[SecurityDisabledException])
     assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(ACL1)).all(),
         classOf[SecurityDisabledException])
     assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(),
@@ -422,7 +422,7 @@ object AdminClientIntegrationTest {
       topicResource2 -> new Config(topicConfigEntries2)
     ).asJava)
 
-    assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+    assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.values.keySet)
     alterResult.all.get
 
     // Verify that topics were updated correctly
@@ -454,7 +454,7 @@ object AdminClientIntegrationTest {
       topicResource2 -> new Config(topicConfigEntries2)
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
-    assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet)
+    assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.values.keySet)
     alterResult.all.get
 
     // Verify that topics were not updated due to validateOnly = true
@@ -495,10 +495,10 @@ object AdminClientIntegrationTest {
       brokerResource -> new Config(brokerConfigEntries)
     ).asJava)
 
-    assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
-    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
-    alterResult.results.get(topicResource2).get
-    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
+    assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+    alterResult.values.get(topicResource2).get
+    assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
 
     // Verify that first and third resources were not updated and second was updated
     var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)
@@ -523,10 +523,10 @@ object AdminClientIntegrationTest {
       brokerResource -> new Config(brokerConfigEntries)
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
-    assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet)
-    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
-    alterResult.results.get(topicResource2).get
-    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
+    assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException])
+    alterResult.values.get(topicResource2).get
+    assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
 
     // Verify that no resources are updated since validate_only = true
     describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index 7d3c54c..8e8e825 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -128,11 +128,11 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
       brokerResource -> new Config(brokerConfigEntries)
     ).asJava)
 
-    assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet)
-    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
-    alterResult.results.get(topicResource2).get
-    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
-    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
+    assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+    alterResult.values.get(topicResource2).get
+    assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+    assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
 
     // Verify that the second resource was updated and the others were not
     var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
@@ -158,11 +158,11 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
       topicResource3 -> new Config(topicConfigEntries3)
     ).asJava, new AlterConfigsOptions().validateOnly(true))
 
-    assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet)
-    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
-    alterResult.results.get(topicResource2).get
-    assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
-    assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
+    assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
+    assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
+    alterResult.values.get(topicResource2).get
+    assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
+    assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
 
     // Verify that no resources are updated since validate_only = true
     describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index d0e0806..b4e09b3 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -96,25 +96,25 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
   @Test
   override def testAclOperations(): Unit = {
     client = AdminClient.create(createConfig())
-    assertEquals(6, client.describeAcls(AclBindingFilter.ANY).all.get().size)
+    assertEquals(6, client.describeAcls(AclBindingFilter.ANY).values.get().size)
     val results = client.createAcls(List(acl2, acl3).asJava)
-    assertEquals(Set(acl2, acl3), results.results.keySet().asScala)
-    results.results.values().asScala.foreach(value => value.get)
+    assertEquals(Set(acl2, acl3), results.values.keySet().asScala)
+    results.values.values().asScala.foreach(value => value.get)
     val aclUnknown = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW))
     val results2 = client.createAcls(List(aclUnknown).asJava)
-    assertEquals(Set(aclUnknown), results2.results.keySet().asScala)
+    assertEquals(Set(aclUnknown), results2.values.keySet().asScala)
     assertFutureExceptionTypeEquals(results2.all, classOf[InvalidRequestException])
-    val results3 = client.deleteAcls(List(ACL1.toFilter, acl2.toFilter, acl3.toFilter).asJava).results
+    val results3 = client.deleteAcls(List(ACL1.toFilter, acl2.toFilter, acl3.toFilter).asJava).values
     assertEquals(Set(ACL1.toFilter, acl2.toFilter, acl3.toFilter), results3.keySet.asScala)
-    assertEquals(0, results3.get(ACL1.toFilter).get.acls.size())
-    assertEquals(Set(acl2), results3.get(acl2.toFilter).get.acls.asScala.map(_.acl).toSet)
-    assertEquals(Set(acl3), results3.get(acl3.toFilter).get.acls.asScala.map(_.acl).toSet)
+    assertEquals(0, results3.get(ACL1.toFilter).get.values.size())
+    assertEquals(Set(acl2), results3.get(acl2.toFilter).get.values.asScala.map(_.binding).toSet)
+    assertEquals(Set(acl3), results3.get(acl3.toFilter).get.values.asScala.map(_.binding).toSet)
   }
 
   def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val results = client.describeAcls(filter).all.get()
+      val results = client.describeAcls(filter).values.get()
       acls == results.asScala.toSet
     }, "timed out waiting for ACLs")
   }
@@ -123,7 +123,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
   def testAclOperations2(): Unit = {
     client = AdminClient.create(createConfig())
     val results = client.createAcls(List(acl2, acl2).asJava)
-    assertEquals(Set(acl2, acl2), results.results.keySet().asScala)
+    assertEquals(Set(acl2, acl2), results.values.keySet().asScala)
     results.all.get()
     waitForDescribeAcls(client, acl2.toFilter, Set(acl2))
 
@@ -133,9 +133,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     waitForDescribeAcls(client, filterA, Set())
 
     val results2 = client.deleteAcls(List(filterA, filterB).asJava, new DeleteAclsOptions())
-    assertEquals(Set(filterA, filterB), results2.results.keySet().asScala)
-    assertEquals(Set(), results2.results.get(filterA).get.acls.asScala.map(_.acl).toSet)
-    assertEquals(Set(acl2), results2.results.get(filterB).get.acls.asScala.map(_.acl).toSet)
+    assertEquals(Set(filterA, filterB), results2.values.keySet().asScala)
+    assertEquals(Set(), results2.values.get(filterA).get.values.asScala.map(_.binding).toSet)
+    assertEquals(Set(acl2), results2.values.get(filterB).get.values.asScala.map(_.binding).toSet)
 
     waitForDescribeAcls(client, filterB, Set())
   }
@@ -148,9 +148,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
     val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, ""),
       new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
     val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions())
-    assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.results.keySet().asScala)
-    assertFutureExceptionTypeEquals(results.results.get(clusterAcl), classOf[InvalidRequestException])
-    assertFutureExceptionTypeEquals(results.results.get(emptyResourceNameAcl), classOf[InvalidRequestException])
+    assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.values.keySet().asScala)
+    assertFutureExceptionTypeEquals(results.values.get(clusterAcl), classOf[InvalidRequestException])
+    assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException])
   }
 
   private def verifyCauseIsClusterAuth(e: Throwable): Unit = {
@@ -196,7 +196,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
             verifyCauseIsClusterAuth(e)
             true
           case Success(_) =>
-            assertEquals(Set(fooAcl), result.results.get(fooAcl.toFilter).get.acls.asScala.map(_.acl).toSet)
+            assertEquals(Set(fooAcl), result.values.get(fooAcl.toFilter).get.values.asScala.map(_.binding).toSet)
             true
         }
       }
@@ -212,14 +212,14 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
         new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
       val results = client.describeAcls(userAcl.toFilter)
       if (expectAuth) {
-        Try(results.all.get) match {
+        Try(results.values.get) match {
           case Failure(e) =>
             verifyCauseIsClusterAuth(e)
             false
           case Success(acls) => Set(userAcl).equals(acls.asScala.toSet)
         }
       } else {
-        Try(results.all.get) match {
+        Try(results.values.get) match {
           case Failure(e) =>
             verifyCauseIsClusterAuth(e)
             true

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 431b53b..b9288d7 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -287,13 +287,13 @@ public class ClientCompatibilityTest {
                     }
                 });
             while (true) {
-                Collection<TopicListing> listings = client.listTopics().descriptions().get();
+                Collection<TopicListing> listings = client.listTopics().listings().get();
                 if (!testConfig.createTopicsSupported)
                     break;
                 boolean foundNewTopic = false;
                 for (TopicListing listing : listings) {
                     if (listing.name().equals("newtopic")) {
-                        if (listing.internal())
+                        if (listing.isInternal())
                             throw new KafkaException("Did not expect newtopic to be an internal topic.");
                         foundNewTopic = true;
                     }
@@ -308,7 +308,7 @@ public class ClientCompatibilityTest {
                     @Override
                     public void invoke() throws Throwable {
                         try {
-                            client.describeAcls(AclBindingFilter.ANY).all().get();
+                            client.describeAcls(AclBindingFilter.ANY).values().get();
                         } catch (ExecutionException e) {
                             if (e.getCause() instanceof SecurityDisabledException)
                                 return;