You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/31 16:37:43 UTC

[2/3] kafka git commit: KAFKA-5265; Move ACLs, Config, Topic classes into org.apache.kafka.common

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
deleted file mode 100644
index a51c1c8..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Collection;
-
-/**
- * The results of the describeCluster call.
- */
-@InterfaceStability.Unstable
-public class DescribeClusterResults {
-    private final KafkaFuture<Collection<Node>> nodes;
-    private final KafkaFuture<Node> controller;
-    private final KafkaFuture<String> clusterId;
-
-    DescribeClusterResults(KafkaFuture<Collection<Node>> nodes,
-                           KafkaFuture<Node> controller,
-                           KafkaFuture<String> clusterId) {
-        this.nodes = nodes;
-        this.controller = controller;
-        this.clusterId = clusterId;
-    }
-
-    /**
-     * Returns a future which yields a collection of nodes.
-     */
-    public KafkaFuture<Collection<Node>> nodes() {
-        return nodes;
-    }
-
-    /**
-     * Returns a future which yields the current controller id.
-     * Note that this may yield null, if the controller ID is not yet known.
-     */
-    public KafkaFuture<Node> controller() {
-        return controller;
-    }
-
-    /**
-     * Returns a future which yields the current cluster Id.
-     * Note that this may yield null, if the cluster version is too old.
-     */
-    public KafkaFuture<String> clusterId() {
-        return clusterId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
new file mode 100644
index 0000000..2379724
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ConfigResource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Unstable
+public class DescribeConfigsResult {
+
+    private final Map<ConfigResource, KafkaFuture<Config>> futures;
+
+    DescribeConfigsResult(Map<ConfigResource, KafkaFuture<Config>> futures) {
+        this.futures = futures;
+    }
+
+    public Map<ConfigResource, KafkaFuture<Config>> results() {
+        return futures;
+    }
+
+    public KafkaFuture<Map<ConfigResource, Config>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+                thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() {
+                    @Override
+                    public Map<ConfigResource, Config> apply(Void v) {
+                        Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
+                        for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) {
+                            try {
+                                configs.put(entry.getKey(), entry.getValue().get());
+                            } catch (InterruptedException | ExecutionException e) {
+                                // This should be unreachable, because allOf ensured that all the futures
+                                // completed successfully.
+                                throw new RuntimeException(e);
+                            }
+                        }
+                        return configs;
+                    }
+                });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
deleted file mode 100644
index c29872a..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-@InterfaceStability.Unstable
-public class DescribeConfigsResults {
-
-    private final Map<ConfigResource, KafkaFuture<Config>> futures;
-
-    DescribeConfigsResults(Map<ConfigResource, KafkaFuture<Config>> futures) {
-        this.futures = futures;
-    }
-
-    public Map<ConfigResource, KafkaFuture<Config>> results() {
-        return futures;
-    }
-
-    public KafkaFuture<Map<ConfigResource, Config>> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-                thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() {
-                    @Override
-                    public Map<ConfigResource, Config> apply(Void v) {
-                        Map<ConfigResource, Config> configs = new HashMap<>(futures.size());
-                        for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) {
-                            try {
-                                configs.put(entry.getKey(), entry.getValue().get());
-                            } catch (InterruptedException | ExecutionException e) {
-                                // This should be unreachable, because allOf ensured that all the futures
-                                // completed successfully.
-                                throw new RuntimeException(e);
-                            }
-                        }
-                        return configs;
-                    }
-                });
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
new file mode 100644
index 0000000..e7cd6b3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The results of the describeTopic call.
+ */
+@InterfaceStability.Unstable
+public class DescribeTopicsResult {
+    private final Map<String, KafkaFuture<TopicDescription>> futures;
+
+    DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures which can be used to check the status of
+     * individual topics.
+     */
+    public Map<String, KafkaFuture<TopicDescription>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the topic descriptions succeed.
+     */
+    public KafkaFuture<Map<String, TopicDescription>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
+                @Override
+                public Map<String, TopicDescription> apply(Void v) {
+                    Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
+                    for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
+                        try {
+                            descriptions.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures
+                            // completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return descriptions;
+                }
+            });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
deleted file mode 100644
index 5c309bb..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-/**
- * The results of the describeTopic call.
- */
-@InterfaceStability.Unstable
-public class DescribeTopicsResults {
-    private final Map<String, KafkaFuture<TopicDescription>> futures;
-
-    DescribeTopicsResults(Map<String, KafkaFuture<TopicDescription>> futures) {
-        this.futures = futures;
-    }
-
-    /**
-     * Return a map from topic names to futures which can be used to check the status of
-     * individual topics.
-     */
-    public Map<String, KafkaFuture<TopicDescription>> results() {
-        return futures;
-    }
-
-    /**
-     * Return a future which succeeds only if all the topic descriptions succeed.
-     */
-    public KafkaFuture<Map<String, TopicDescription>> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
-                @Override
-                public Map<String, TopicDescription> apply(Void v) {
-                    Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
-                    for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
-                        try {
-                            descriptions.put(entry.getKey(), entry.getValue().get());
-                        } catch (InterruptedException | ExecutionException e) {
-                            // This should be unreachable, because allOf ensured that all the futures
-                            // completed successfully.
-                            throw new RuntimeException(e);
-                        }
-                    }
-                    return descriptions;
-                }
-            });
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 98fc3f3..9fa0cad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -25,14 +25,18 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResult;
-import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResults;
+import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
+import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.DisconnectException;
@@ -995,7 +999,7 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public CreateTopicResults createTopics(final Collection<NewTopic> newTopics,
+    public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                                            final CreateTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
         final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
@@ -1046,11 +1050,11 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         }, now);
-        return new CreateTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+        return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
     }
 
     @Override
-    public DeleteTopicResults deleteTopics(final Collection<String> topicNames,
+    public DeleteTopicsResult deleteTopics(final Collection<String> topicNames,
                                            DeleteTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
         for (String topicName : topicNames) {
@@ -1099,11 +1103,11 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         }, now);
-        return new DeleteTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+        return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
     }
 
     @Override
-    public ListTopicsResults listTopics(final ListTopicsOptions options) {
+    public ListTopicsResult listTopics(final ListTopicsOptions options) {
         final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),
@@ -1132,11 +1136,11 @@ public class KafkaAdminClient extends AdminClient {
                 topicListingFuture.completeExceptionally(throwable);
             }
         }, now);
-        return new ListTopicsResults(topicListingFuture);
+        return new ListTopicsResult(topicListingFuture);
     }
 
     @Override
-    public DescribeTopicsResults describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) {
+    public DescribeTopicsResult describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) {
         final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
         final ArrayList<String> topicNamesList = new ArrayList<>();
         for (String topicName : topicNames) {
@@ -1190,11 +1194,11 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         }, now);
-        return new DescribeTopicsResults(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
+        return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
     }
 
     @Override
-    public DescribeClusterResults describeCluster(DescribeClusterOptions options) {
+    public DescribeClusterResult describeCluster(DescribeClusterOptions options) {
         final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>();
         final KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>();
         final KafkaFutureImpl<String> clusterIdFuture = new KafkaFutureImpl<>();
@@ -1223,11 +1227,11 @@ public class KafkaAdminClient extends AdminClient {
             }
         }, now);
 
-        return new DescribeClusterResults(describeClusterFuture, controllerFuture, clusterIdFuture);
+        return new DescribeClusterResult(describeClusterFuture, controllerFuture, clusterIdFuture);
     }
 
     @Override
-    public ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options) {
+    public ApiVersionsResult apiVersions(Collection<Node> nodes, ApiVersionsOptions options) {
         final long now = time.milliseconds();
         final long deadlineMs = calcDeadlineMs(now, options.timeoutMs());
         Map<Node, KafkaFuture<NodeApiVersions>> nodeFutures = new HashMap<>();
@@ -1254,12 +1258,12 @@ public class KafkaAdminClient extends AdminClient {
                     }
                 }, now);
         }
-        return new ApiVersionsResults(nodeFutures);
+        return new ApiVersionsResult(nodeFutures);
 
     }
 
     @Override
-    public DescribeAclsResults describeAcls(final AclBindingFilter filter, DescribeAclsOptions options) {
+    public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAclsOptions options) {
         final long now = time.milliseconds();
         final KafkaFutureImpl<Collection<AclBinding>> future = new KafkaFutureImpl<>();
         runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.timeoutMs()),
@@ -1285,11 +1289,11 @@ public class KafkaAdminClient extends AdminClient {
                 future.completeExceptionally(throwable);
             }
         }, now);
-        return new DescribeAclsResults(future);
+        return new DescribeAclsResult(future);
     }
 
     @Override
-    public CreateAclsResults createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
+    public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
         final long now = time.milliseconds();
         final Map<AclBinding, KafkaFutureImpl<Void>> futures = new HashMap<>();
         final List<AclCreation> aclCreations = new ArrayList<>();
@@ -1340,11 +1344,11 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(futures.values(), throwable);
             }
         }, now);
-        return new CreateAclsResults(new HashMap<AclBinding, KafkaFuture<Void>>(futures));
+        return new CreateAclsResult(new HashMap<AclBinding, KafkaFuture<Void>>(futures));
     }
 
     @Override
-    public DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) {
+    public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) {
         final long now = time.milliseconds();
         final Map<AclBindingFilter, KafkaFutureImpl<FilterResults>> futures = new HashMap<>();
         final List<AclBindingFilter> filterList = new ArrayList<>();
@@ -1392,11 +1396,11 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(futures.values(), throwable);
             }
         }, now);
-        return new DeleteAclsResults(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures));
+        return new DeleteAclsResult(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures));
     }
 
     @Override
-    public DescribeConfigsResults describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
+    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
         final Map<ConfigResource, KafkaFutureImpl<Config>> singleRequestFutures = new HashMap<>();
         final Collection<Resource> singleRequestResources = new ArrayList<>(configResources.size());
 
@@ -1487,7 +1491,7 @@ public class KafkaAdminClient extends AdminClient {
         Map<ConfigResource, KafkaFutureImpl<Config>> allFutures = new HashMap<>(configResources.size());
         allFutures.putAll(singleRequestFutures);
         allFutures.putAll(brokerFutures);
-        return new DescribeConfigsResults(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures));
+        return new DescribeConfigsResult(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures));
     }
 
     private Resource configResourceToResource(ConfigResource configResource) {
@@ -1506,7 +1510,7 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
+    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
         final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>(configs.size());
         for (ConfigResource configResource : configs.keySet()) {
             futures.put(configResource, new KafkaFutureImpl<Void>());
@@ -1548,6 +1552,6 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(futures.values(), throwable);
             }
         }, now);
-        return new AlterConfigsResults(new HashMap<ConfigResource, KafkaFuture<Void>>(futures));
+        return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(futures));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
new file mode 100644
index 0000000..7b2fae8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The result of the listTopics call.
+ */
+@InterfaceStability.Unstable
+public class ListTopicsResult {
+    final KafkaFuture<Map<String, TopicListing>> future;
+
+    ListTopicsResult(KafkaFuture<Map<String, TopicListing>> future) {
+        this.future = future;
+    }
+
+    /**
+     * Return a future which yields a map of topic names to TopicListing objects.
+     */
+    public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() {
+        return future;
+    }
+
+    /**
+     * Return a future which yields a collection of TopicListing objects.
+     */
+    public KafkaFuture<Collection<TopicListing>> descriptions() {
+        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
+            @Override
+            public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
+                return namesToDescriptions.values();
+            }
+        });
+    }
+
+    /**
+     * Return a future which yields a collection of topic names.
+     */
+    public KafkaFuture<Collection<String>> names() {
+        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() {
+            @Override
+            public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) {
+                return namesToDescriptions.keySet();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
deleted file mode 100644
index 7e9448d..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * The result of the listTopics call.
- */
-@InterfaceStability.Unstable
-public class ListTopicsResults {
-    final KafkaFuture<Map<String, TopicListing>> future;
-
-    ListTopicsResults(KafkaFuture<Map<String, TopicListing>> future) {
-        this.future = future;
-    }
-
-    /**
-     * Return a future which yields a map of topic names to TopicListing objects.
-     */
-    public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() {
-        return future;
-    }
-
-    /**
-     * Return a future which yields a collection of TopicListing objects.
-     */
-    public KafkaFuture<Collection<TopicListing>> descriptions() {
-        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
-            @Override
-            public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
-                return namesToDescriptions.values();
-            }
-        });
-    }
-
-    /**
-     * Return a future which yields a collection of topic names.
-     */
-    public KafkaFuture<Collection<String>> names() {
-        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() {
-            @Override
-            public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) {
-                return namesToDescriptions.keySet();
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java b/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java
deleted file mode 100644
index 9148aac..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import java.util.Objects;
-
-/**
- * Represents a cluster resource with a tuple of (type, name).
- */
-public class Resource {
-    private final ResourceType resourceType;
-    private final String name;
-
-    public Resource(ResourceType resourceType, String name) {
-        Objects.requireNonNull(resourceType);
-        this.resourceType = resourceType;
-        Objects.requireNonNull(name);
-        this.name = name;
-    }
-
-    public ResourceType resourceType() {
-        return resourceType;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    /**
-     * Create a filter which matches only this Resource.
-     */
-    public ResourceFilter toFilter() {
-        return new ResourceFilter(resourceType, name);
-    }
-
-    @Override
-    public String toString() {
-        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")";
-    }
-
-    /**
-     * Return true if this Resource has any UNKNOWN components.
-     */
-    public boolean unknown() {
-        return resourceType.unknown();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof Resource))
-            return false;
-        Resource other = (Resource) o;
-        return resourceType.equals(other.resourceType) && Objects.equals(name, other.name);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(resourceType, name);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java
deleted file mode 100644
index 6f453b6..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import java.util.Objects;
-
-/**
- * A filter which matches Resource objects.
- */
-public class ResourceFilter {
-    private final ResourceType resourceType;
-    private final String name;
-
-    public static final ResourceFilter ANY = new ResourceFilter(ResourceType.ANY, null);
-
-    public ResourceFilter(ResourceType resourceType, String name) {
-        Objects.requireNonNull(resourceType);
-        this.resourceType = resourceType;
-        this.name = name;
-    }
-
-    public ResourceType resourceType() {
-        return resourceType;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public String toString() {
-        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")";
-    }
-
-    /**
-     * Return true if this ResourceFilter has any UNKNOWN components.
-     */
-    public boolean unknown() {
-        return resourceType.unknown();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof ResourceFilter))
-            return false;
-        ResourceFilter other = (ResourceFilter) o;
-        return resourceType.equals(other.resourceType) && Objects.equals(name, other.name);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(resourceType, name);
-    }
-
-    public boolean matches(Resource other) {
-        if ((name != null) && (!name.equals(other.name())))
-            return false;
-        if ((resourceType != ResourceType.ANY) && (!resourceType.equals(other.resourceType())))
-            return false;
-        return true;
-    }
-
-    public boolean matchesAtMostOne() {
-        return findIndefiniteField() == null;
-    }
-
-    public String findIndefiniteField() {
-        if (resourceType == ResourceType.ANY)
-            return "Resource type is ANY.";
-        if (resourceType == ResourceType.UNKNOWN)
-            return "Resource type is UNKNOWN.";
-        if (name == null)
-            return "Resource name is NULL.";
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
deleted file mode 100644
index ca4fa0a..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import java.util.HashMap;
-import java.util.Locale;
-
-/**
- * Represents a type of resource which an ACL can be applied to.
- */
-public enum ResourceType {
-    /**
-     * Represents any ResourceType which this client cannot understand,
-     * perhaps because this client is too old.
-     */
-    UNKNOWN((byte) 0),
-
-    /**
-     * In a filter, matches any ResourceType.
-     */
-    ANY((byte) 1),
-
-    /**
-     * A Kafka topic.
-     */
-    TOPIC((byte) 2),
-
-    /**
-     * A consumer group.
-     */
-    GROUP((byte) 3),
-
-    /**
-     * The cluster as a whole.
-     */
-    CLUSTER((byte) 4),
-
-    /**
-     * A broker.
-     */
-    BROKER((byte) 5);
-
-    private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>();
-
-    static {
-        for (ResourceType resourceType : ResourceType.values()) {
-            CODE_TO_VALUE.put(resourceType.code, resourceType);
-        }
-    }
-
-    /**
-     * Parse the given string as an ACL resource type.
-     *
-     * @param str    The string to parse.
-     *
-     * @return       The ResourceType, or UNKNOWN if the string could not be matched.
-     */
-    public static ResourceType fromString(String str) throws IllegalArgumentException {
-        try {
-            return ResourceType.valueOf(str.toUpperCase(Locale.ROOT));
-        } catch (IllegalArgumentException e) {
-            return UNKNOWN;
-        }
-    }
-
-    public static ResourceType fromCode(byte code) {
-        ResourceType resourceType = CODE_TO_VALUE.get(code);
-        if (resourceType == null) {
-            return UNKNOWN;
-        }
-        return resourceType;
-    }
-
-    private final byte code;
-
-    ResourceType(byte code) {
-        this.code = code;
-    }
-
-    public byte code() {
-        return code;
-    }
-
-    public boolean unknown() {
-        return this == UNKNOWN;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
index f13dfff..bf1431e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.NavigableMap;

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
deleted file mode 100644
index 5241602..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.utils.Utils;
-
-import java.util.List;
-
-public class TopicPartitionInfo {
-    private final int partition;
-    private final Node leader;
-    private final List<Node> replicas;
-    private final List<Node> isr;
-
-    public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
-        this.partition = partition;
-        this.leader = leader;
-        this.replicas = replicas;
-        this.isr = isr;
-    }
-
-    public int partition() {
-        return partition;
-    }
-
-    public Node leader() {
-        return leader;
-    }
-
-    public List<Node> replicas() {
-        return replicas;
-    }
-
-    public List<Node> isr() {
-        return isr;
-    }
-
-    public String toString() {
-        return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
-            Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
new file mode 100644
index 0000000..70352bd
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.List;
+
+public class TopicPartitionInfo {
+    private final int partition;
+    private final Node leader;
+    private final List<Node> replicas;
+    private final List<Node> isr;
+
+    public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
+        this.partition = partition;
+        this.leader = leader;
+        this.replicas = replicas;
+        this.isr = isr;
+    }
+
+    public int partition() {
+        return partition;
+    }
+
+    public Node leader() {
+        return leader;
+    }
+
+    public List<Node> replicas() {
+        return replicas;
+    }
+
+    public List<Node> isr() {
+        return isr;
+    }
+
+    public String toString() {
+        return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
+            Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
new file mode 100644
index 0000000..68464b3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import java.util.Objects;
+
+/**
+ * Represents an access control entry.  ACEs are a tuple of principal, host,
+ * operation, and permissionType.
+ */
+public class AccessControlEntry {
+    final AccessControlEntryData data;
+
+    public AccessControlEntry(String principal, String host, AclOperation operation, AclPermissionType permissionType) {
+        Objects.requireNonNull(principal);
+        Objects.requireNonNull(host);
+        Objects.requireNonNull(operation);
+        assert operation != AclOperation.ANY;
+        Objects.requireNonNull(permissionType);
+        assert permissionType != AclPermissionType.ANY;
+        this.data = new AccessControlEntryData(principal, host, operation, permissionType);
+    }
+
+    public String principal() {
+        return data.principal();
+    }
+
+    public String host() {
+        return data.host();
+    }
+
+    public AclOperation operation() {
+        return data.operation();
+    }
+
+    public AclPermissionType permissionType() {
+        return data.permissionType();
+    }
+
+    /**
+     * Create a filter which matches only this AccessControlEntry.
+     */
+    public AccessControlEntryFilter toFilter() {
+        return new AccessControlEntryFilter(data);
+    }
+
+    @Override
+    public String toString() {
+        return data.toString();
+    }
+
+    /**
+     * Return true if this AclResource has any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return data.unknown();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AccessControlEntry))
+            return false;
+        AccessControlEntry other = (AccessControlEntry) o;
+        return data.equals(other.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return data.hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
new file mode 100644
index 0000000..cf69263
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import java.util.Objects;
+
+/**
+ * An internal, private class which contains the data stored in AccessControlEntry and
+ * AccessControlEntryFilter objects.
+ */
+class AccessControlEntryData {
+    private final String principal;
+    private final String host;
+    private final AclOperation operation;
+    private final AclPermissionType permissionType;
+
+    AccessControlEntryData(String principal, String host, AclOperation operation, AclPermissionType permissionType) {
+        this.principal = principal;
+        this.host = host;
+        this.operation = operation;
+        this.permissionType = permissionType;
+    }
+
+    String principal() {
+        return principal;
+    }
+
+    String host() {
+        return host;
+    }
+
+    AclOperation operation() {
+        return operation;
+    }
+
+    AclPermissionType permissionType() {
+        return permissionType;
+    }
+
+    /**
+     * Returns a string describing an ANY or UNKNOWN field, or null if there is
+     * no such field.
+     */
+    public String findIndefiniteField() {
+        if (principal() == null)
+            return "Principal is NULL";
+        if (host() == null)
+            return "Host is NULL";
+        if (operation() == AclOperation.ANY)
+            return "Operation is ANY";
+        if (operation() == AclOperation.UNKNOWN)
+            return "Operation is UNKNOWN";
+        if (permissionType() == AclPermissionType.ANY)
+            return "Permission type is ANY";
+        if (permissionType() == AclPermissionType.UNKNOWN)
+            return "Permission type is UNKNOWN";
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return "(principal=" + (principal == null ? "<any>" : principal) +
+               ", host=" + (host == null ? "<any>" : host) +
+               ", operation=" + operation +
+               ", permissionType=" + permissionType + ")";
+    }
+
+    /**
+     * Return true if there are any UNKNOWN components.
+     */
+    boolean unknown() {
+        return operation.unknown() || permissionType.unknown();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AccessControlEntryData))
+            return false;
+        AccessControlEntryData other = (AccessControlEntryData) o;
+        return Objects.equals(principal, other.principal) &&
+            Objects.equals(host, other.host) &&
+            Objects.equals(operation, other.operation) &&
+            Objects.equals(permissionType, other.permissionType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(principal, host, operation, permissionType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
new file mode 100644
index 0000000..7817865
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import java.util.Objects;
+
+/**
+ * Represents a filter which matches access control entries.
+ */
+public class AccessControlEntryFilter {
+    private final AccessControlEntryData data;
+
+    public static final AccessControlEntryFilter ANY =
+        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY);
+
+    public AccessControlEntryFilter(String principal, String host, AclOperation operation, AclPermissionType permissionType) {
+        Objects.requireNonNull(operation);
+        Objects.requireNonNull(permissionType);
+        this.data = new AccessControlEntryData(principal, host, operation, permissionType);
+    }
+
+    /**
+     * This is a non-public constructor used in AccessControlEntry#toFilter
+     *
+     * @param data     The access control data.
+     */
+    AccessControlEntryFilter(AccessControlEntryData data) {
+        this.data = data;
+    }
+
+    public String principal() {
+        return data.principal();
+    }
+
+    public String host() {
+        return data.host();
+    }
+
+    public AclOperation operation() {
+        return data.operation();
+    }
+
+    public AclPermissionType permissionType() {
+        return data.permissionType();
+    }
+
+    @Override
+    public String toString() {
+        return data.toString();
+    }
+
+    /**
+     * Return true if there are any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return data.unknown();
+    }
+
+    /**
+     * Returns true if this filter matches the given AccessControlEntry.
+     */
+    public boolean matches(AccessControlEntry other) {
+        if ((principal() != null) && (!data.principal().equals(other.principal())))
+            return false;
+        if ((host() != null) && (!host().equals(other.host())))
+            return false;
+        if ((operation() != AclOperation.ANY) && (!operation().equals(other.operation())))
+            return false;
+        if ((permissionType() != AclPermissionType.ANY) && (!permissionType().equals(other.permissionType())))
+            return false;
+        return true;
+    }
+
+    /**
+     * Returns true if this filter could only match one ACE-- in other words, if
+     * there are no ANY or UNKNOWN fields.
+     */
+    public boolean matchesAtMostOne() {
+        return findIndefiniteField() == null;
+    }
+
+    /**
+     * Returns a string describing an ANY or UNKNOWN field, or null if there is
+     * no such field.
+     */
+    public String findIndefiniteField() {
+        return data.findIndefiniteField();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AccessControlEntryFilter))
+            return false;
+        AccessControlEntryFilter other = (AccessControlEntryFilter) o;
+        return data.equals(other.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return data.hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
new file mode 100644
index 0000000..91c1c79
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import org.apache.kafka.common.resource.Resource;
+
+import java.util.Objects;
+
+/**
+ * Represents a binding between a resource and an access control entry.
+ */
+public class AclBinding {
+    private final Resource resource;
+    private final AccessControlEntry entry;
+
+    public AclBinding(Resource resource, AccessControlEntry entry) {
+        Objects.requireNonNull(resource);
+        this.resource = resource;
+        Objects.requireNonNull(entry);
+        this.entry = entry;
+    }
+
+    /**
+     * Return true if this binding has any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return resource.unknown() || entry.unknown();
+    }
+
+    public Resource resource() {
+        return resource;
+    }
+
+    public final AccessControlEntry entry() {
+        return entry;
+    }
+
+    /**
+     * Create a filter which matches only this AclBinding.
+     */
+    public AclBindingFilter toFilter() {
+        return new AclBindingFilter(resource.toFilter(), entry.toFilter());
+    }
+
+    @Override
+    public String toString() {
+        return "(resource=" + resource + ", entry=" + entry + ")";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AclBinding))
+            return false;
+        AclBinding other = (AclBinding) o;
+        return resource.equals(other.resource) && entry.equals(other.entry);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resource, entry);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
new file mode 100644
index 0000000..765fac2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import org.apache.kafka.common.resource.ResourceFilter;
+import org.apache.kafka.common.resource.ResourceType;
+
+import java.util.Objects;
+
+/**
+ * A filter which can match AclBinding objects.
+ */
+public class AclBindingFilter {
+    private final ResourceFilter resourceFilter;
+    private final AccessControlEntryFilter entryFilter;
+
+    /**
+     * A filter which matches any ACL binding.
+     */
+    public static final AclBindingFilter ANY = new AclBindingFilter(
+        new ResourceFilter(ResourceType.ANY, null),
+        new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
+
+    public AclBindingFilter(ResourceFilter resourceFilter, AccessControlEntryFilter entryFilter) {
+        Objects.requireNonNull(resourceFilter);
+        this.resourceFilter = resourceFilter;
+        Objects.requireNonNull(entryFilter);
+        this.entryFilter = entryFilter;
+    }
+
+    /**
+     * Return true if this filter has any UNKNOWN components.
+     */
+    public boolean unknown() {
+        return resourceFilter.unknown() || entryFilter.unknown();
+    }
+
+    public ResourceFilter resourceFilter() {
+        return resourceFilter;
+    }
+
+    public final AccessControlEntryFilter entryFilter() {
+        return entryFilter;
+    }
+
+    @Override
+    public String toString() {
+        return "(resourceFilter=" + resourceFilter + ", entryFilter=" + entryFilter + ")";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AclBindingFilter))
+            return false;
+        AclBindingFilter other = (AclBindingFilter) o;
+        return resourceFilter.equals(other.resourceFilter) && entryFilter.equals(other.entryFilter);
+    }
+
+    public boolean matchesAtMostOne() {
+        return resourceFilter.matchesAtMostOne() && entryFilter.matchesAtMostOne();
+    }
+
+    public String findIndefiniteField() {
+        String indefinite = resourceFilter.findIndefiniteField();
+        if (indefinite != null)
+            return indefinite;
+        return entryFilter.findIndefiniteField();
+    }
+
+    public boolean matches(AclBinding binding) {
+        return resourceFilter.matches(binding.resource()) && entryFilter.matches(binding.entry());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resourceFilter, entryFilter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
new file mode 100644
index 0000000..c63320d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import java.util.HashMap;
+import java.util.Locale;
+
+/**
+ * Represents an operation which an ACL grants or denies permission to perform.
+ */
+public enum AclOperation {
+    /**
+     * Represents any AclOperation which this client cannot understand, perhaps because this
+     * client is too old.
+     */
+    UNKNOWN((byte) 0),
+
+    /**
+     * In a filter, matches any AclOperation.
+     */
+    ANY((byte) 1),
+
+    /**
+     * ALL operation.
+     */
+    ALL((byte) 2),
+
+    /**
+     * READ operation.
+     */
+    READ((byte) 3),
+
+    /**
+     * WRITE operation.
+     */
+    WRITE((byte) 4),
+
+    /**
+     * CREATE operation.
+     */
+    CREATE((byte) 5),
+
+    /**
+     * DELETE operation.
+     */
+    DELETE((byte) 6),
+
+    /**
+     * ALTER operation.
+     */
+    ALTER((byte) 7),
+
+    /**
+     * DESCRIBE operation.
+     */
+    DESCRIBE((byte) 8),
+
+    /**
+     * CLUSTER_ACTION operation.
+     */
+    CLUSTER_ACTION((byte) 9),
+
+    /**
+     * DESCRIBE_CONFIGS operation.
+     */
+    DESCRIBE_CONFIGS((byte) 10),
+
+    /**
+     * ALTER_CONFIGS operation.
+     */
+    ALTER_CONFIGS((byte) 11),
+
+    /**
+     * IDEMPOTENT_WRITE operation.
+     */
+    IDEMPOTENT_WRITE((byte) 12);
+
+    private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>();
+
+    static {
+        for (AclOperation operation : AclOperation.values()) {
+            CODE_TO_VALUE.put(operation.code, operation);
+        }
+    }
+
+    /**
+     * Parse the given string as an ACL operation.
+     *
+     * @param str    The string to parse.
+     *
+     * @return       The AclOperation, or UNKNOWN if the string could not be matched.
+     */
+    public static AclOperation fromString(String str) throws IllegalArgumentException {
+        try {
+            return AclOperation.valueOf(str.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            return UNKNOWN;
+        }
+    }
+
+    public static AclOperation fromCode(byte code) {
+        AclOperation operation = CODE_TO_VALUE.get(code);
+        if (operation == null) {
+            return UNKNOWN;
+        }
+        return operation;
+    }
+
+    private final byte code;
+
+    AclOperation(byte code) {
+        this.code = code;
+    }
+
+    public byte code() {
+        return code;
+    }
+
+    public boolean unknown() {
+        return this == UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
new file mode 100644
index 0000000..8c77938
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.acl;
+
+import java.util.HashMap;
+import java.util.Locale;
+
+/**
+ * Represents whether an ACL grants or denies permissions.
+ */
+public enum AclPermissionType {
+    /**
+     * Represents any AclPermissionType which this client cannot understand,
+     * perhaps because this client is too old.
+     */
+    UNKNOWN((byte) 0),
+
+    /**
+     * In a filter, matches any AclPermissionType.
+     */
+    ANY((byte) 1),
+
+    /**
+     * Disallows access.
+     */
+    DENY((byte) 2),
+
+    /**
+     * Grants access.
+     */
+    ALLOW((byte) 3);
+
+    private final static HashMap<Byte, AclPermissionType> CODE_TO_VALUE = new HashMap<>();
+
+    static {
+        for (AclPermissionType permissionType : AclPermissionType.values()) {
+            CODE_TO_VALUE.put(permissionType.code, permissionType);
+        }
+    }
+
+    /**
+    * Parse the given string as an ACL permission.
+    *
+    * @param str    The string to parse.
+    *
+    * @return       The AclPermissionType, or UNKNOWN if the string could not be matched.
+    */
+    public static AclPermissionType fromString(String str) {
+        try {
+            return AclPermissionType.valueOf(str.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            return UNKNOWN;
+        }
+    }
+
+    public static AclPermissionType fromCode(byte code) {
+        AclPermissionType permissionType = CODE_TO_VALUE.get(code);
+        if (permissionType == null) {
+            return UNKNOWN;
+        }
+        return permissionType;
+    }
+
+    private final byte code;
+
+    AclPermissionType(byte code) {
+        this.code = code;
+    }
+
+    public byte code() {
+        return code;
+    }
+
+    public boolean unknown() {
+        return this == UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
new file mode 100644
index 0000000..5395671
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+public final class ConfigResource {
+
+    public enum Type {
+        BROKER, TOPIC, UNKNOWN;
+    }
+
+    private final Type type;
+    private final String name;
+
+    public ConfigResource(Type type, String name) {
+        this.type = type;
+        this.name = name;
+    }
+
+    public Type type() {
+        return type;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ConfigResource that = (ConfigResource) o;
+
+        return type == that.type && name.equals(that.name);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = type.hashCode();
+        result = 31 * result + name.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "ConfigResource{type=" + type + ", name='" + name + "'}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
new file mode 100755
index 0000000..554c97b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+/**
+ * Keys that can be used to configure a topic.  These keys are useful when creating or reconfiguring a
+ * topic using the AdminClient.
+ *
+ * The intended pattern is for broker configs to include a `log.` prefix. For example, to set the default broker
+ * cleanup policy, one would set log.cleanup.policy instead of cleanup.policy. Unfortunately, there are many cases
+ * where this pattern is not followed.
+ */
+// This is a public API, so we should not remove or alter keys without a discussion and a deprecation period.
+// Eventually this should replace LogConfig.scala.
+public class TopicConfig {
+    public static final String SEGMENT_BYTES_CONFIG = "segment.bytes";
+    public static final String SEGMENT_BYTES_DOC = "This configuration controls the segment file size for " +
+        "the log. Retention and cleaning is always done a file at a time so a larger segment size means " +
+        "fewer files but less granular control over retention.";
+
+    public static final String SEGMENT_MS_CONFIG = "segment.ms";
+    public static final String SEGMENT_MS_DOC = "This configuration controls the period of time after " +
+        "which Kafka will force the log to roll even if the segment file isn't full to ensure that retention " +
+        "can delete or compact old data.";
+
+    public static final String SEGMENT_JITTER_MS_CONFIG = "segment.jitter.ms";
+    public static final String SEGMENT_JITTER_MS_DOC = "The maximum random jitter subtracted from the scheduled " +
+        "segment roll time to avoid thundering herds of segment rolling";
+
+    public static final String SEGMENT_INDEX_BYTES_CONFIG = "segment.index.bytes";
+    public static final String SEGMENT_INDEX_BYTES_DOC = "This configuration controls the size of the index that " +
+        "maps offsets to file positions. We preallocate this index file and shrink it only after log " +
+        "rolls. You generally should not need to change this setting.";
+
+    public static final String FLUSH_MESSAGES_INTERVAL_CONFIG = "flush.messages";
+    public static final String FLUSH_MESSAGES_INTERVAL_DOC = "This setting allows specifying an interval at " +
+        "which we will force an fsync of data written to the log. For example if this was set to 1 " +
+        "we would fsync after every message; if it were 5 we would fsync after every five messages. " +
+        "In general we recommend you not set this and use replication for durability and allow the " +
+        "operating system's background flush capabilities as it is more efficient. This setting can " +
+        "be overridden on a per-topic basis (see <a href=\"#topic-config\">the per-topic configuration section</a>).";
+
+    public static final String FLUSH_MS_CONFIG = "flush.ms";
+    public static final String FLUSH_MS_DOC = "This setting allows specifying a time interval at which we will " +
+        "force an fsync of data written to the log. For example if this was set to 1000 " +
+        "we would fsync after 1000 ms had passed. In general we recommend you not set " +
+        "this and use replication for durability and allow the operating system's background " +
+        "flush capabilities as it is more efficient.";
+
+    public static final String RETENTION_BYTES_CONFIG = "retention.bytes";
+    public static final String RETENTION_BYTES_DOC = "This configuration controls the maximum size a log can grow " +
+        "to before we will discard old log segments to free up space if we are using the " +
+        "\"delete\" retention policy. By default there is no size limit only a time limit.";
+
+    public static final String RETENTION_MS_CONFIG = "retention.ms";
+    public static final String RETENTION_MS_DOC = "This configuration controls the maximum time we will retain a " +
+        "log before we will discard old log segments to free up space if we are using the " +
+        "\"delete\" retention policy. This represents an SLA on how soon consumers must read " +
+        "their data.";
+
+    public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
+    public static final String MAX_MESSAGE_BYTES_DOC = "This is largest message size Kafka will allow to be " +
+        "appended. Note that if you increase this size you must also increase your consumer's fetch size so " +
+        "they can fetch messages this large.";
+
+    public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
+    public static final String INDEX_INTERVAL_BYTES_DOCS = "This setting controls how frequently " +
+        "Kafka adds an index entry to it's offset index. The default setting ensures that we index a " +
+        "message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact " +
+        "position in the log but makes the index larger. You probably don't need to change this.";
+
+    public static final String FILE_DELETE_DELAY_MS_CONFIG = "file.delete.delay.ms";
+    public static final String FILE_DELETE_DELAY_MS_DOC = "The time to wait before deleting a file from the " +
+        "filesystem";
+
+    public static final String DELETE_RETENTION_MS_CONFIG = "delete.retention.ms";
+    public static final String DELETE_RETENTION_MS_DOC = "The amount of time to retain delete tombstone markers " +
+        "for <a href=\"#compaction\">log compacted</a> topics. This setting also gives a bound " +
+        "on the time in which a consumer must complete a read if they begin from offset 0 " +
+        "to ensure that they get a valid snapshot of the final stage (otherwise delete " +
+        "tombstones may be collected before they complete their scan).";
+
+    public static final String MIN_COMPACTION_LAG_MS_CONFIG = "min.compaction.lag.ms";
+    public static final String MIN_COMPACTION_LAG_MS_DOC = "The minimum time a message will remain " +
+        "uncompacted in the log. Only applicable for logs that are being compacted.";
+
+    public static final String MIN_CLEANABLE_DIRTY_RATIO_CONFIG = "min.cleanable.dirty.ratio";
+    public static final String MIN_CLEANABLE_DIRTY_RATIO_DOC = "This configuration controls how frequently " +
+        "the log compactor will attempt to clean the log (assuming <a href=\"#compaction\">log " +
+        "compaction</a> is enabled). By default we will avoid cleaning a log where more than " +
+        "50% of the log has been compacted. This ratio bounds the maximum space wasted in " +
+        "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " +
+        "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " +
+        "space in the log.";
+
+    public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
+    public static final String CLEANUP_POLICY_COMPACT = "compact";
+    public static final String CLEANUP_POLICY_DELETE = "delete";
+    public static final String CLEANUP_POLICY_DOC = "A string that is either \"" + CLEANUP_POLICY_DELETE +
+        "\" or \"" + CLEANUP_POLICY_COMPACT + "\". This string designates the retention policy to use on " +
+        "old log segments. The default policy (\"delete\") will discard old segments when their retention " +
+        "time or size limit has been reached. The \"compact\" setting will enable <a href=\"#compaction\">log " +
+        "compaction</a> on the topic.";
+
+    public static final String UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG = "unclean.leader.election.enable";
+    public static final String UNCLEAN_LEADER_ELECTION_ENABLE_DOC = "Indicates whether to enable replicas " +
+        "not in the ISR set to be elected as leader as a last resort, even though doing so may result in data " +
+        "loss.";
+
+    public static final String MIN_IN_SYNC_REPLICAS_CONFIG = "min.insync.replicas";
+    public static final String MIN_IN_SYNC_REPLICAS_DOC = "When a producer sets acks to \"all\" (or \"-1\"), " +
+        "this configuration specifies the minimum number of replicas that must acknowledge " +
+        "a write for the write to be considered successful. If this minimum cannot be met, " +
+        "then the producer will raise an exception (either NotEnoughReplicas or " +
+        "NotEnoughReplicasAfterAppend).<br>When used together, min.insync.replicas and acks " +
+        "allow you to enforce greater durability guarantees. A typical scenario would be to " +
+        "create a topic with a replication factor of 3, set min.insync.replicas to 2, and " +
+        "produce with acks of \"all\". This will ensure that the producer raises an exception " +
+        "if a majority of replicas do not receive a write.";
+
+    public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
+    public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " +
+        "This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally " +
+        "accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " +
+        "original compression codec set by the producer.";
+
+    public static final String PREALLOCATE_CONFIG = "preallocate";
+    public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
+        "creating a new log segment.";
+
+    public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";
+    public static final String MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker " +
+        "will use to append messages to the logs. The value should be a valid ApiVersion. Some examples are: " +
+        "0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format " +
+        "version, the user is certifying that all the existing messages on disk are smaller or equal than the " +
+        "specified version. Setting this value incorrectly will cause consumers with older versions to break as " +
+        "they will receive messages with a format that they don't understand.";
+
+    public static final String MESSAGE_TIMESTAMP_TYPE_CONFIG = "message.timestamp.type";
+    public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " +
+        "message create time or log append time. The value should be either `CreateTime` or `LogAppendTime`";
+
+    public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = "message.timestamp.difference.max.ms";
+    public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = "The maximum difference allowed between " +
+        "the timestamp when a broker receives a message and the timestamp specified in the message. If " +
+        "message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp " +
+        "exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index f792bbd..757b5af 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -17,11 +17,11 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.AccessControlEntry;
-import org.apache.kafka.clients.admin.AclBinding;
-import org.apache.kafka.clients.admin.Resource;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/da9a171c/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 8a9ee19..246b5e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -16,11 +16,11 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.AccessControlEntryFilter;
-import org.apache.kafka.clients.admin.AclBindingFilter;
-import org.apache.kafka.clients.admin.ResourceFilter;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;