You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/01 23:24:50 UTC

[3/3] kafka git commit: KAFKA-3265; Add a public AdminClient API in Java (KIP-117)

KAFKA-3265; Add a public AdminClient API in Java (KIP-117)

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Dan Norwood <no...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #2472 from cmccabe/KAFKA-3265


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4aed28d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4aed28d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4aed28d1

Branch: refs/heads/trunk
Commit: 4aed28d1897c6c5293f372cb4fc44ab363dfc365
Parents: c96656e
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Tue May 2 00:16:01 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 2 00:20:22 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |    5 +
 checkstyle/suppressions.xml                     |    4 +-
 .../java/org/apache/kafka/clients/Metadata.java |    4 +-
 .../apache/kafka/clients/admin/AdminClient.java |  186 +++
 .../kafka/clients/admin/AdminClientConfig.java  |  163 +++
 .../kafka/clients/admin/ApiVersionsOptions.java |   37 +
 .../kafka/clients/admin/ApiVersionsResults.java |   63 ++
 .../kafka/clients/admin/CreateTopicResults.java |   49 +
 .../clients/admin/CreateTopicsOptions.java      |   47 +
 .../kafka/clients/admin/DeleteTopicResults.java |   50 +
 .../clients/admin/DeleteTopicsOptions.java      |   37 +
 .../clients/admin/DescribeClusterOptions.java   |   37 +
 .../clients/admin/DescribeClusterResults.java   |   43 +
 .../clients/admin/DescribeTopicsOptions.java    |   37 +
 .../clients/admin/DescribeTopicsResults.java    |   68 ++
 .../kafka/clients/admin/KafkaAdminClient.java   | 1065 ++++++++++++++++++
 .../kafka/clients/admin/ListTopicsOptions.java  |   54 +
 .../kafka/clients/admin/ListTopicsResults.java  |   67 ++
 .../apache/kafka/clients/admin/NewTopic.java    |   85 ++
 .../kafka/clients/admin/TopicDescription.java   |   56 +
 .../kafka/clients/admin/TopicListing.java       |   44 +
 .../kafka/clients/admin/TopicPartitionInfo.java |   58 +
 .../java/org/apache/kafka/common/Cluster.java   |   33 +-
 .../org/apache/kafka/common/KafkaFuture.java    |  155 +++
 .../kafka/common/internals/KafkaFutureImpl.java |  264 +++++
 .../kafka/common/network/ChannelBuilder.java    |    2 +-
 .../apache/kafka/common/network/Selector.java   |    2 +-
 .../apache/kafka/common/protocol/Errors.java    |  507 +++++++--
 .../common/requests/CreateTopicsResponse.java   |    5 +
 .../kafka/common/requests/MetadataResponse.java |    3 +-
 .../org/apache/kafka/common/utils/Utils.java    |    2 +-
 .../clients/admin/KafkaAdminClientTest.java     |  206 ++++
 .../apache/kafka/common/KafkaFutureTest.java    |  164 +++
 .../main/scala/kafka/admin/AdminClient.scala    |    5 +
 .../integration/kafka/api/AdminClientTest.scala |  263 -----
 .../api/KafkaAdminClientIntegrationTest.scala   |  162 +++
 .../kafka/api/LegacyAdminClientTest.scala       |  266 +++++
 .../api/SaslSslAdminClientIntegrationTest.scala |   26 +
 .../test/scala/unit/kafka/utils/TestUtils.scala |    3 +
 39 files changed, 3943 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index d7851a5..d40c4d4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -47,6 +47,7 @@
   <subpackage name="common">
     <disallow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.common" exact-match="true" />
+    <allow pkg="org.apache.kafka.common.internals" exact-match="true" />
     <allow pkg="org.apache.kafka.test" />
 
     <subpackage name="config">
@@ -134,6 +135,10 @@
       <allow pkg="org.apache.kafka.clients.consumer" />
       <allow pkg="org.apache.kafka.clients.producer" />
     </subpackage>
+
+    <subpackage name="admin">
+      <allow pkg="org.apache.kafka.clients.admin" />
+    </subpackage>
   </subpackage>
 
   <subpackage name="server">

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index eae8dde..dd41f94 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest).java"/>
+              files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient).java"/>
     <suppress checks="ClassFanOutComplexity"
               files=".*/protocol/Errors.java"/>
     <suppress checks="ClassFanOutComplexity"
@@ -35,7 +35,7 @@
               files="DefaultRecordBatch.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager).java"/>
+              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
               files=".*/protocol/Errors.java"/>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 5bfdb64..9ff629d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -332,6 +332,7 @@ public final class Metadata {
         Collection<PartitionInfo> partitionInfos = new ArrayList<>();
         List<Node> nodes = Collections.emptyList();
         Set<String> internalTopics = Collections.emptySet();
+        Node controller = null;
         String clusterId = null;
         if (cluster != null) {
             clusterId = cluster.clusterResource().clusterId();
@@ -346,7 +347,8 @@ public final class Metadata {
                 }
             }
             nodes = cluster.nodes();
+            controller  = cluster.controller();
         }
-        return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics);
+        return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics, controller);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
new file mode 100644
index 0000000..a97219b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The public interface for the {@link KafkaAdminClient}, which supports managing and inspecting topics,
+ * brokers, and configurations.
+ *
+ * @see KafkaAdminClient
+ */
+@InterfaceStability.Unstable
+public abstract class AdminClient implements AutoCloseable {
+    /**
+     * Create a new AdminClient with the given configuration.
+     *
+     * @param conf          The configuration.
+     * @return              The new KafkaAdminClient.
+     */
+    public static AdminClient create(Map<String, Object> conf) {
+        return KafkaAdminClient.create(new AdminClientConfig(conf));
+    }
+
+    /**
+     * Close the AdminClient and release all associated resources.
+     */
+    public abstract void close();
+
+    /**
+     * Create a batch of new topics with the default options.
+     *
+     * @param newTopics         The new topics to create.
+     * @return                  The CreateTopicsResults.
+     */
+    public CreateTopicResults createTopics(Collection<NewTopic> newTopics) {
+        return createTopics(newTopics, new CreateTopicsOptions());
+    }
+
+    /**
+     * Create a batch of new topics.
+     *
+     * It may take several seconds after AdminClient#createTopics returns
+     * success for all the brokers to become aware that the topics have been created.
+     * During this time, AdminClient#listTopics and AdminClient#describeTopics
+     * may not return information about the new topics.
+     *
+     * @param newTopics         The new topics to create.
+     * @param options           The options to use when creating the new topics.
+     * @return                  The CreateTopicsResults.
+     */
+    public abstract CreateTopicResults createTopics(Collection<NewTopic> newTopics,
+                                                    CreateTopicsOptions options);
+
+    /**
+     * Similar to #{@link AdminClient#deleteTopics(Collection<String>, DeleteTopicsOptions),
+     * but uses the default options.
+     *
+     * @param topics            The topic names to delete.
+     * @return                  The DeleteTopicsResults.
+     */
+    public DeleteTopicResults deleteTopics(Collection<String> topics) {
+        return deleteTopics(topics, new DeleteTopicsOptions());
+    }
+
+    /**
+     * Delete a batch of topics.
+     *
+     * It may take several seconds after AdminClient#deleteTopics returns
+     * success for all the brokers to become aware that the topics are gone.
+     * During this time, AdminClient#listTopics and AdminClient#describeTopics
+     * may continue to return information about the deleted topics.
+     *
+     * If delete.topic.enable is false on the brokers, deleteTopics will mark
+     * the topics for deletion, but not actually delete them.  The futures will
+     * return successfully in this case.
+     *
+     * @param topics            The topic names to delete.
+     * @param options           The options to use when deleting the topics.
+     * @return                  The DeleteTopicsResults.
+     */
+    public abstract DeleteTopicResults deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
+
+    /**
+     * List the topics available in the cluster with the default options.
+     *
+     * @return                  The ListTopicsResults.
+     */
+    public ListTopicsResults listTopics() {
+        return listTopics(new ListTopicsOptions());
+    }
+
+    /**
+     * List the topics available in the cluster.
+     *
+     * @param options           The options to use when listing the topics.
+     * @return                  The ListTopicsResults.
+     */
+    public abstract ListTopicsResults listTopics(ListTopicsOptions options);
+
+    /**
+     * Descripe an individual topic in the cluster, with the default options.
+     *
+     * See {@link AdminClient#describeTopics(Collection<String>, DescribeTopicsOptions)}
+     *
+     * @param topicNames        The names of the topics to describe.
+     *
+     * @return                  The DescribeTopicsResults.
+     */
+    public DescribeTopicsResults describeTopics(Collection<String> topicNames) {
+        return describeTopics(topicNames, new DescribeTopicsOptions());
+    }
+
+    /**
+     * Descripe an individual topic in the cluster.
+     *
+     * Note that if auto.create.topics.enable is true on the brokers,
+     * AdminClient#describeTopic(topicName) may create a topic named topicName.
+     * There are two workarounds: either use AdminClient#listTopics and ensure
+     * that the topic is present before describing, or disable
+     * auto.create.topics.enable.
+     *
+     * @param topicNames        The names of the topics to describe.
+     * @param options           The options to use when describing the topic.
+     *
+     * @return                  The DescribeTopicsResults.
+     */
+    public abstract DescribeTopicsResults describeTopics(Collection<String> topicNames,
+                                                         DescribeTopicsOptions options);
+
+    /**
+     * Get information about the nodes in the cluster, using the default options.
+     *
+     * @return                  The DescribeClusterResults.
+     */
+    public DescribeClusterResults describeCluster() {
+        return describeCluster(new DescribeClusterOptions());
+    }
+
+    /**
+     * Get information about the nodes in the cluster.
+     *
+     * @param options           The options to use when getting information about the cluster.
+     * @return                  The DescribeClusterResults.
+     */
+    public abstract DescribeClusterResults describeCluster(DescribeClusterOptions options);
+
+    /**
+     * Get information about the api versions of nodes in the cluster with the default options.
+     * See {@link AdminClient#apiVersions(Collection<Node>, ApiVersionsOptions)}
+     *
+     * @param nodes             The nodes to get information about, or null to get information about all nodes.
+     * @return                  The ApiVersionsResults.
+     */
+    public ApiVersionsResults apiVersions(Collection<Node> nodes) {
+        return apiVersions(nodes, new ApiVersionsOptions());
+    }
+
+    /**
+     * Get information about the api versions of nodes in the cluster.
+     *
+     * @param nodes             The nodes to get information about, or null to get information about all nodes.
+     * @param options           The options to use when getting api versions of the nodes.
+     * @return                  The ApiVersionsResults.
+     */
+    public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
new file mode 100644
index 0000000..368a42e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.metrics.Sensor;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+/**
+ * The AdminClient configuration keys
+ */
+public class AdminClientConfig extends AbstractConfig {
+    private static final ConfigDef CONFIG;
+
+    /**
+     * <code>bootstrap.servers</code>
+     */
+    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+    private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
+
+    /**
+     * <code>reconnect.backoff.ms</code>
+     */
+    public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
+    private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
+
+    /**
+     * <code>retry.backoff.ms</code>
+     */
+    public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
+    private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to " +
+                "retry a failed request. This avoids repeatedly sending requests in a tight loop under " +
+                "some failure scenarios.";
+
+    /** <code>connections.max.idle.ms</code> */
+    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
+    private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
+
+    /** <code>request.timeout.ms</code> */
+    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+
+    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+    private static final String CLIENT_ID_DOC = CommonClientConfigs.CLIENT_ID_DOC;
+
+    public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+    private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
+
+    public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
+    private static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC;
+
+    public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
+    private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC;
+
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+    private static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
+
+    public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+    private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
+
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+    private static final String METRICS_SAMPLE_WINDOW_MS_DOC = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC;
+
+    public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
+
+    public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+    public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+    private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
+    private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
+
+    static {
+        CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
+                                        Type.LIST,
+                                        Importance.HIGH,
+                                        BOOTSTRAP_SERVERS_DOC)
+                                .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
+                                .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
+                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, SEND_BUFFER_DOC)
+                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 64 * 1024, atLeast(-1), Importance.MEDIUM, RECEIVE_BUFFER_DOC)
+                                .define(RECONNECT_BACKOFF_MS_CONFIG,
+                                        Type.LONG,
+                                        50L,
+                                        atLeast(0L),
+                                        Importance.LOW,
+                                        RECONNECT_BACKOFF_MS_DOC)
+                                .define(RETRY_BACKOFF_MS_CONFIG,
+                                        Type.LONG,
+                                        100L,
+                                        atLeast(0L),
+                                        Importance.LOW,
+                                        RETRY_BACKOFF_MS_DOC)
+                                .define(REQUEST_TIMEOUT_MS_CONFIG,
+                                        Type.INT,
+                                        120000,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        REQUEST_TIMEOUT_MS_DOC)
+                                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                                        Type.LONG,
+                                        5 * 60 * 1000,
+                                        Importance.MEDIUM,
+                                        CONNECTIONS_MAX_IDLE_MS_DOC)
+                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        METRICS_SAMPLE_WINDOW_MS_DOC)
+                                .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
+                                .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
+                                .define(METRICS_RECORDING_LEVEL_CONFIG,
+                                    Type.STRING,
+                                    Sensor.RecordingLevel.INFO.toString(),
+                                    in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+                                    Importance.LOW,
+                                    METRICS_RECORDING_LEVEL_DOC)
+                                // security support
+                                .define(SECURITY_PROTOCOL_CONFIG,
+                                        Type.STRING,
+                                        DEFAULT_SECURITY_PROTOCOL,
+                                        Importance.MEDIUM,
+                                        SECURITY_PROTOCOL_DOC)
+                                .withClientSslSupport()
+                                .withClientSaslSupport();
+    }
+
+    AdminClientConfig(Map<?, ?> props) {
+        super(CONFIG, props);
+    }
+
+    public static Set<String> configNames() {
+        return CONFIG.names();
+    }
+
+    public static void main(String[] args) {
+        System.out.println(CONFIG.toHtmlTable());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
new file mode 100644
index 0000000..cbcd234
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for the apiVersions call.
+ */
+@InterfaceStability.Unstable
+public class ApiVersionsOptions {
+    private Integer timeoutMs = null;
+
+    public ApiVersionsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
new file mode 100644
index 0000000..456c64d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Results of the apiVersions call.
+ */
+@InterfaceStability.Unstable
+public class ApiVersionsResults {
+    private final Map<Node, KafkaFuture<NodeApiVersions>> futures;
+
+    ApiVersionsResults(Map<Node, KafkaFuture<NodeApiVersions>> futures) {
+        this.futures = futures;
+    }
+
+    public Map<Node, KafkaFuture<NodeApiVersions>> results() {
+        return futures;
+    }
+
+    public KafkaFuture<Map<Node, NodeApiVersions>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<Node, NodeApiVersions>>() {
+                @Override
+                public Map<Node, NodeApiVersions> apply(Void v) {
+                    Map<Node, NodeApiVersions> versions = new HashMap<>(futures.size());
+                    for (Map.Entry<Node, KafkaFuture<NodeApiVersions>> entry : futures.entrySet()) {
+                        try {
+                            versions.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures
+                            // completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return versions;
+                }
+            });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
new file mode 100644
index 0000000..03da7d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of newTopics.
+ */
+@InterfaceStability.Unstable
+public class CreateTopicResults {
+    private final Map<String, KafkaFuture<Void>> futures;
+
+    CreateTopicResults(Map<String, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures, which can be used to check the status of individual
+     * topic creations.
+     */
+    public Map<String, KafkaFuture<Void>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds if all the topic creations succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
new file mode 100644
index 0000000..c1f3944
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for newTopics.
+ */
+@InterfaceStability.Unstable
+public class CreateTopicsOptions {
+    private Integer timeoutMs = null;
+    private boolean validateOnly = false;
+
+    public CreateTopicsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+
+    public CreateTopicsOptions validateOnly(boolean validateOnly) {
+        this.validateOnly = validateOnly;
+        return this;
+    }
+
+    public boolean validateOnly() {
+        return validateOnly;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
new file mode 100644
index 0000000..3dd4889
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of the deleteTopics call.
+ */
+@InterfaceStability.Unstable
+public class DeleteTopicResults {
+    final Map<String, KafkaFuture<Void>> futures;
+
+    DeleteTopicResults(Map<String, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures which can be used to check the status of
+     * individual deletions.
+     */
+    public Map<String, KafkaFuture<Void>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the topic deletions succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
new file mode 100644
index 0000000..3630968
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for deleteTopics.
+ */
+@InterfaceStability.Unstable
+public class DeleteTopicsOptions {
+    private Integer timeoutMs = null;
+
+    public DeleteTopicsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
new file mode 100644
index 0000000..604ee13
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for the describeCluster call.
+ */
+@InterfaceStability.Unstable
+public class DescribeClusterOptions {
+    private Integer timeoutMs = null;
+
+    public DescribeClusterOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
new file mode 100644
index 0000000..5ee834b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * The results of the describeCluster call.
+ */
+@InterfaceStability.Unstable
+public class DescribeClusterResults {
+    private final KafkaFuture<Collection<Node>> future;
+
+    DescribeClusterResults(KafkaFuture<Collection<Node>> future) {
+        this.future = future;
+    }
+
+    /**
+     * Returns a future which yields a collection of nodes.
+     */
+    public KafkaFuture<Collection<Node>> nodes() {
+        return future;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
new file mode 100644
index 0000000..1bf6632
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for describeTopics.
+ */
+@InterfaceStability.Unstable
+public class DescribeTopicsOptions {
+    private Integer timeoutMs = null;
+
+    public DescribeTopicsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
new file mode 100644
index 0000000..630ba95
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The results of the describeTopic call.
+ */
+@InterfaceStability.Unstable
+public class DescribeTopicsResults {
+    private final Map<String, KafkaFuture<TopicDescription>> futures;
+
+    DescribeTopicsResults(Map<String, KafkaFuture<TopicDescription>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures which can be used to check the status of
+     * individual deletions.
+     */
+    public Map<String, KafkaFuture<TopicDescription>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the topic deletions succeed.
+     */
+    public KafkaFuture<Map<String, TopicDescription>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
+                @Override
+                public Map<String, TopicDescription> apply(Void v) {
+                    Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
+                    for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
+                        try {
+                            descriptions.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures
+                            // completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return descriptions;
+                }
+            });
+    }
+}