You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/01 23:24:50 UTC
[3/3] kafka git commit: KAFKA-3265;
Add a public AdminClient API in Java (KIP-117)
KAFKA-3265; Add a public AdminClient API in Java (KIP-117)
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Dan Norwood <no...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #2472 from cmccabe/KAFKA-3265
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4aed28d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4aed28d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4aed28d1
Branch: refs/heads/trunk
Commit: 4aed28d1897c6c5293f372cb4fc44ab363dfc365
Parents: c96656e
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Tue May 2 00:16:01 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 2 00:20:22 2017 +0100
----------------------------------------------------------------------
checkstyle/import-control.xml | 5 +
checkstyle/suppressions.xml | 4 +-
.../java/org/apache/kafka/clients/Metadata.java | 4 +-
.../apache/kafka/clients/admin/AdminClient.java | 186 +++
.../kafka/clients/admin/AdminClientConfig.java | 163 +++
.../kafka/clients/admin/ApiVersionsOptions.java | 37 +
.../kafka/clients/admin/ApiVersionsResults.java | 63 ++
.../kafka/clients/admin/CreateTopicResults.java | 49 +
.../clients/admin/CreateTopicsOptions.java | 47 +
.../kafka/clients/admin/DeleteTopicResults.java | 50 +
.../clients/admin/DeleteTopicsOptions.java | 37 +
.../clients/admin/DescribeClusterOptions.java | 37 +
.../clients/admin/DescribeClusterResults.java | 43 +
.../clients/admin/DescribeTopicsOptions.java | 37 +
.../clients/admin/DescribeTopicsResults.java | 68 ++
.../kafka/clients/admin/KafkaAdminClient.java | 1065 ++++++++++++++++++
.../kafka/clients/admin/ListTopicsOptions.java | 54 +
.../kafka/clients/admin/ListTopicsResults.java | 67 ++
.../apache/kafka/clients/admin/NewTopic.java | 85 ++
.../kafka/clients/admin/TopicDescription.java | 56 +
.../kafka/clients/admin/TopicListing.java | 44 +
.../kafka/clients/admin/TopicPartitionInfo.java | 58 +
.../java/org/apache/kafka/common/Cluster.java | 33 +-
.../org/apache/kafka/common/KafkaFuture.java | 155 +++
.../kafka/common/internals/KafkaFutureImpl.java | 264 +++++
.../kafka/common/network/ChannelBuilder.java | 2 +-
.../apache/kafka/common/network/Selector.java | 2 +-
.../apache/kafka/common/protocol/Errors.java | 507 +++++++--
.../common/requests/CreateTopicsResponse.java | 5 +
.../kafka/common/requests/MetadataResponse.java | 3 +-
.../org/apache/kafka/common/utils/Utils.java | 2 +-
.../clients/admin/KafkaAdminClientTest.java | 206 ++++
.../apache/kafka/common/KafkaFutureTest.java | 164 +++
.../main/scala/kafka/admin/AdminClient.scala | 5 +
.../integration/kafka/api/AdminClientTest.scala | 263 -----
.../api/KafkaAdminClientIntegrationTest.scala | 162 +++
.../kafka/api/LegacyAdminClientTest.scala | 266 +++++
.../api/SaslSslAdminClientIntegrationTest.scala | 26 +
.../test/scala/unit/kafka/utils/TestUtils.scala | 3 +
39 files changed, 3943 insertions(+), 384 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index d7851a5..d40c4d4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -47,6 +47,7 @@
<subpackage name="common">
<disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.internals" exact-match="true" />
<allow pkg="org.apache.kafka.test" />
<subpackage name="config">
@@ -134,6 +135,10 @@
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.clients.producer" />
</subpackage>
+
+ <subpackage name="admin">
+ <allow pkg="org.apache.kafka.clients.admin" />
+ </subpackage>
</subpackage>
<subpackage name="server">
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index eae8dde..dd41f94 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
- files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest).java"/>
+ files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient).java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/protocol/Errors.java"/>
<suppress checks="ClassFanOutComplexity"
@@ -35,7 +35,7 @@
files="DefaultRecordBatch.java"/>
<suppress checks="ClassDataAbstractionCoupling"
- files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager).java"/>
+ files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files=".*/protocol/Errors.java"/>
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 5bfdb64..9ff629d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -332,6 +332,7 @@ public final class Metadata {
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
Set<String> internalTopics = Collections.emptySet();
+ Node controller = null;
String clusterId = null;
if (cluster != null) {
clusterId = cluster.clusterResource().clusterId();
@@ -346,7 +347,8 @@ public final class Metadata {
}
}
nodes = cluster.nodes();
+ controller = cluster.controller();
}
- return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics);
+ return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics, controller);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
new file mode 100644
index 0000000..a97219b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The public interface for the {@link KafkaAdminClient}, which supports managing and inspecting topics,
+ * brokers, and configurations.
+ *
+ * @see KafkaAdminClient
+ */
+@InterfaceStability.Unstable
+public abstract class AdminClient implements AutoCloseable {
+ /**
+ * Create a new AdminClient with the given configuration.
+ *
+ * @param conf The configuration.
+ * @return The new KafkaAdminClient.
+ */
+ public static AdminClient create(Map<String, Object> conf) {
+ return KafkaAdminClient.create(new AdminClientConfig(conf));
+ }
+
+ /**
+ * Close the AdminClient and release all associated resources.
+ */
+ public abstract void close();
+
+ /**
+ * Create a batch of new topics with the default options.
+ *
+ * @param newTopics The new topics to create.
+ * @return The CreateTopicsResults.
+ */
+ public CreateTopicResults createTopics(Collection<NewTopic> newTopics) {
+ return createTopics(newTopics, new CreateTopicsOptions());
+ }
+
+ /**
+ * Create a batch of new topics.
+ *
+ * It may take several seconds after AdminClient#createTopics returns
+ * success for all the brokers to become aware that the topics have been created.
+ * During this time, AdminClient#listTopics and AdminClient#describeTopics
+ * may not return information about the new topics.
+ *
+ * @param newTopics The new topics to create.
+ * @param options The options to use when creating the new topics.
+ * @return The CreateTopicsResults.
+ */
+ public abstract CreateTopicResults createTopics(Collection<NewTopic> newTopics,
+ CreateTopicsOptions options);
+
+ /**
+ * Similar to #{@link AdminClient#deleteTopics(Collection<String>, DeleteTopicsOptions),
+ * but uses the default options.
+ *
+ * @param topics The topic names to delete.
+ * @return The DeleteTopicsResults.
+ */
+ public DeleteTopicResults deleteTopics(Collection<String> topics) {
+ return deleteTopics(topics, new DeleteTopicsOptions());
+ }
+
+ /**
+ * Delete a batch of topics.
+ *
+ * It may take several seconds after AdminClient#deleteTopics returns
+ * success for all the brokers to become aware that the topics are gone.
+ * During this time, AdminClient#listTopics and AdminClient#describeTopics
+ * may continue to return information about the deleted topics.
+ *
+ * If delete.topic.enable is false on the brokers, deleteTopics will mark
+ * the topics for deletion, but not actually delete them. The futures will
+ * return successfully in this case.
+ *
+ * @param topics The topic names to delete.
+ * @param options The options to use when deleting the topics.
+ * @return The DeleteTopicsResults.
+ */
+ public abstract DeleteTopicResults deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
+
+ /**
+ * List the topics available in the cluster with the default options.
+ *
+ * @return The ListTopicsResults.
+ */
+ public ListTopicsResults listTopics() {
+ return listTopics(new ListTopicsOptions());
+ }
+
+ /**
+ * List the topics available in the cluster.
+ *
+ * @param options The options to use when listing the topics.
+ * @return The ListTopicsResults.
+ */
+ public abstract ListTopicsResults listTopics(ListTopicsOptions options);
+
+ /**
+ * Descripe an individual topic in the cluster, with the default options.
+ *
+ * See {@link AdminClient#describeTopics(Collection<String>, DescribeTopicsOptions)}
+ *
+ * @param topicNames The names of the topics to describe.
+ *
+ * @return The DescribeTopicsResults.
+ */
+ public DescribeTopicsResults describeTopics(Collection<String> topicNames) {
+ return describeTopics(topicNames, new DescribeTopicsOptions());
+ }
+
+ /**
+ * Descripe an individual topic in the cluster.
+ *
+ * Note that if auto.create.topics.enable is true on the brokers,
+ * AdminClient#describeTopic(topicName) may create a topic named topicName.
+ * There are two workarounds: either use AdminClient#listTopics and ensure
+ * that the topic is present before describing, or disable
+ * auto.create.topics.enable.
+ *
+ * @param topicNames The names of the topics to describe.
+ * @param options The options to use when describing the topic.
+ *
+ * @return The DescribeTopicsResults.
+ */
+ public abstract DescribeTopicsResults describeTopics(Collection<String> topicNames,
+ DescribeTopicsOptions options);
+
+ /**
+ * Get information about the nodes in the cluster, using the default options.
+ *
+ * @return The DescribeClusterResults.
+ */
+ public DescribeClusterResults describeCluster() {
+ return describeCluster(new DescribeClusterOptions());
+ }
+
+ /**
+ * Get information about the nodes in the cluster.
+ *
+ * @param options The options to use when getting information about the cluster.
+ * @return The DescribeClusterResults.
+ */
+ public abstract DescribeClusterResults describeCluster(DescribeClusterOptions options);
+
+ /**
+ * Get information about the api versions of nodes in the cluster with the default options.
+ * See {@link AdminClient#apiVersions(Collection<Node>, ApiVersionsOptions)}
+ *
+ * @param nodes The nodes to get information about, or null to get information about all nodes.
+ * @return The ApiVersionsResults.
+ */
+ public ApiVersionsResults apiVersions(Collection<Node> nodes) {
+ return apiVersions(nodes, new ApiVersionsOptions());
+ }
+
+ /**
+ * Get information about the api versions of nodes in the cluster.
+ *
+ * @param nodes The nodes to get information about, or null to get information about all nodes.
+ * @param options The options to use when getting api versions of the nodes.
+ * @return The ApiVersionsResults.
+ */
+ public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
new file mode 100644
index 0000000..368a42e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.metrics.Sensor;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+/**
+ * The AdminClient configuration keys
+ */
+public class AdminClientConfig extends AbstractConfig {
+ private static final ConfigDef CONFIG;
+
+ /**
+ * <code>bootstrap.servers</code>
+ */
+ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+ private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
+
+ /**
+ * <code>reconnect.backoff.ms</code>
+ */
+ public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
+ private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
+
+ /**
+ * <code>retry.backoff.ms</code>
+ */
+ public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
+ private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to " +
+ "retry a failed request. This avoids repeatedly sending requests in a tight loop under " +
+ "some failure scenarios.";
+
+ /** <code>connections.max.idle.ms</code> */
+ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
+ private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
+
+ /** <code>request.timeout.ms</code> */
+ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+ private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+
+ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+ private static final String CLIENT_ID_DOC = CommonClientConfigs.CLIENT_ID_DOC;
+
+ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+ private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
+
+ public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
+ private static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC;
+
+ public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
+ private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC;
+
+ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+ private static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
+
+ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+ private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
+
+ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+ private static final String METRICS_SAMPLE_WINDOW_MS_DOC = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC;
+
+ public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
+
+ public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+ public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+ private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
+ private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
+
+ static {
+ CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
+ Type.LIST,
+ Importance.HIGH,
+ BOOTSTRAP_SERVERS_DOC)
+ .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
+ .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
+ .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, SEND_BUFFER_DOC)
+ .define(RECEIVE_BUFFER_CONFIG, Type.INT, 64 * 1024, atLeast(-1), Importance.MEDIUM, RECEIVE_BUFFER_DOC)
+ .define(RECONNECT_BACKOFF_MS_CONFIG,
+ Type.LONG,
+ 50L,
+ atLeast(0L),
+ Importance.LOW,
+ RECONNECT_BACKOFF_MS_DOC)
+ .define(RETRY_BACKOFF_MS_CONFIG,
+ Type.LONG,
+ 100L,
+ atLeast(0L),
+ Importance.LOW,
+ RETRY_BACKOFF_MS_DOC)
+ .define(REQUEST_TIMEOUT_MS_CONFIG,
+ Type.INT,
+ 120000,
+ atLeast(0),
+ Importance.MEDIUM,
+ REQUEST_TIMEOUT_MS_DOC)
+ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+ Type.LONG,
+ 5 * 60 * 1000,
+ Importance.MEDIUM,
+ CONNECTIONS_MAX_IDLE_MS_DOC)
+ .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+ Type.LONG,
+ 30000,
+ atLeast(0),
+ Importance.LOW,
+ METRICS_SAMPLE_WINDOW_MS_DOC)
+ .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
+ .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
+ .define(METRICS_RECORDING_LEVEL_CONFIG,
+ Type.STRING,
+ Sensor.RecordingLevel.INFO.toString(),
+ in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+ Importance.LOW,
+ METRICS_RECORDING_LEVEL_DOC)
+ // security support
+ .define(SECURITY_PROTOCOL_CONFIG,
+ Type.STRING,
+ DEFAULT_SECURITY_PROTOCOL,
+ Importance.MEDIUM,
+ SECURITY_PROTOCOL_DOC)
+ .withClientSslSupport()
+ .withClientSaslSupport();
+ }
+
+ AdminClientConfig(Map<?, ?> props) {
+ super(CONFIG, props);
+ }
+
+ public static Set<String> configNames() {
+ return CONFIG.names();
+ }
+
+ public static void main(String[] args) {
+ System.out.println(CONFIG.toHtmlTable());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
new file mode 100644
index 0000000..cbcd234
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for the apiVersions call.
+ */
+@InterfaceStability.Unstable
+public class ApiVersionsOptions {
+ private Integer timeoutMs = null;
+
+ public ApiVersionsOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
new file mode 100644
index 0000000..456c64d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Results of the apiVersions call.
+ */
+@InterfaceStability.Unstable
+public class ApiVersionsResults {
+ private final Map<Node, KafkaFuture<NodeApiVersions>> futures;
+
+ ApiVersionsResults(Map<Node, KafkaFuture<NodeApiVersions>> futures) {
+ this.futures = futures;
+ }
+
+ public Map<Node, KafkaFuture<NodeApiVersions>> results() {
+ return futures;
+ }
+
+ public KafkaFuture<Map<Node, NodeApiVersions>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+ thenApply(new KafkaFuture.Function<Void, Map<Node, NodeApiVersions>>() {
+ @Override
+ public Map<Node, NodeApiVersions> apply(Void v) {
+ Map<Node, NodeApiVersions> versions = new HashMap<>(futures.size());
+ for (Map.Entry<Node, KafkaFuture<NodeApiVersions>> entry : futures.entrySet()) {
+ try {
+ versions.put(entry.getKey(), entry.getValue().get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, because allOf ensured that all the futures
+ // completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ return versions;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
new file mode 100644
index 0000000..03da7d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of newTopics.
+ */
+@InterfaceStability.Unstable
+public class CreateTopicResults {
+ private final Map<String, KafkaFuture<Void>> futures;
+
+ CreateTopicResults(Map<String, KafkaFuture<Void>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from topic names to futures, which can be used to check the status of individual
+ * topic creations.
+ */
+ public Map<String, KafkaFuture<Void>> results() {
+ return futures;
+ }
+
+ /**
+ * Return a future which succeeds if all the topic creations succeed.
+ */
+ public KafkaFuture<Void> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
new file mode 100644
index 0000000..c1f3944
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for newTopics.
+ */
+@InterfaceStability.Unstable
+public class CreateTopicsOptions {
+ private Integer timeoutMs = null;
+ private boolean validateOnly = false;
+
+ public CreateTopicsOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+
+ public CreateTopicsOptions validateOnly(boolean validateOnly) {
+ this.validateOnly = validateOnly;
+ return this;
+ }
+
+ public boolean validateOnly() {
+ return validateOnly;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
new file mode 100644
index 0000000..3dd4889
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of the deleteTopics call.
+ */
+@InterfaceStability.Unstable
+public class DeleteTopicResults {
+ final Map<String, KafkaFuture<Void>> futures;
+
+ DeleteTopicResults(Map<String, KafkaFuture<Void>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from topic names to futures which can be used to check the status of
+ * individual deletions.
+ */
+ public Map<String, KafkaFuture<Void>> results() {
+ return futures;
+ }
+
+ /**
+ * Return a future which succeeds only if all the topic deletions succeed.
+ */
+ public KafkaFuture<Void> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
new file mode 100644
index 0000000..3630968
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for deleteTopics.
+ */
+@InterfaceStability.Unstable
+public class DeleteTopicsOptions {
+ private Integer timeoutMs = null;
+
+ public DeleteTopicsOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
new file mode 100644
index 0000000..604ee13
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for the describeCluster call.
+ */
+@InterfaceStability.Unstable
+public class DescribeClusterOptions {
+ private Integer timeoutMs = null;
+
+ public DescribeClusterOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
new file mode 100644
index 0000000..5ee834b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * The results of the describeCluster call.
+ */
+@InterfaceStability.Unstable
+public class DescribeClusterResults {
+ private final KafkaFuture<Collection<Node>> future;
+
+ DescribeClusterResults(KafkaFuture<Collection<Node>> future) {
+ this.future = future;
+ }
+
+ /**
+ * Returns a future which yields a collection of nodes.
+ */
+ public KafkaFuture<Collection<Node>> nodes() {
+ return future;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
new file mode 100644
index 0000000..1bf6632
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for describeTopics.
+ */
+@InterfaceStability.Unstable
+public class DescribeTopicsOptions {
+ private Integer timeoutMs = null;
+
+ public DescribeTopicsOptions timeoutMs(Integer timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
+
+ public Integer timeoutMs() {
+ return timeoutMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
new file mode 100644
index 0000000..630ba95
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The results of the describeTopic call.
+ */
+@InterfaceStability.Unstable
+public class DescribeTopicsResults {
+ private final Map<String, KafkaFuture<TopicDescription>> futures;
+
+ DescribeTopicsResults(Map<String, KafkaFuture<TopicDescription>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from topic names to futures which can be used to check the status of
+ * individual deletions.
+ */
+ public Map<String, KafkaFuture<TopicDescription>> results() {
+ return futures;
+ }
+
+ /**
+ * Return a future which succeeds only if all the topic deletions succeed.
+ */
+ public KafkaFuture<Map<String, TopicDescription>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+ thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
+ @Override
+ public Map<String, TopicDescription> apply(Void v) {
+ Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
+ for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
+ try {
+ descriptions.put(entry.getKey(), entry.getValue().get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, because allOf ensured that all the futures
+ // completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ return descriptions;
+ }
+ });
+ }
+}