You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/01 23:24:48 UTC

[1/3] kafka git commit: KAFKA-3265; Add a public AdminClient API in Java (KIP-117)

Repository: kafka
Updated Branches:
  refs/heads/trunk c96656efb -> 4aed28d18


http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 80e9191..65bec4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -83,114 +83,388 @@ import java.util.Map;
  * Do not add exceptions that occur only on the client or only on the server here.
  */
 public enum Errors {
-    UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")),
-    NONE(0, null),
-    OFFSET_OUT_OF_RANGE(1,
-            new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
-    CORRUPT_MESSAGE(2,
-            new CorruptRecordException("This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.")),
-    UNKNOWN_TOPIC_OR_PARTITION(3,
-            new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
-    INVALID_FETCH_SIZE(4,
-            new InvalidFetchSizeException("The requested fetch size is invalid.")),
-    LEADER_NOT_AVAILABLE(5,
-            new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
-    NOT_LEADER_FOR_PARTITION(6,
-            new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
-    REQUEST_TIMED_OUT(7,
-            new TimeoutException("The request timed out.")),
-    BROKER_NOT_AVAILABLE(8,
-            new BrokerNotAvailableException("The broker is not available.")),
-    REPLICA_NOT_AVAILABLE(9,
-            new ReplicaNotAvailableException("The replica is not available for the requested topic-partition")),
-    MESSAGE_TOO_LARGE(10,
-            new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
-    STALE_CONTROLLER_EPOCH(11,
-            new ControllerMovedException("The controller moved to another broker.")),
-    OFFSET_METADATA_TOO_LARGE(12,
-            new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
-    NETWORK_EXCEPTION(13,
-            new NetworkException("The server disconnected before a response was received.")),
-    COORDINATOR_LOAD_IN_PROGRESS(14,
-            new CoordinatorLoadInProgressException("The coordinator is loading and hence can't process requests.")),
-    COORDINATOR_NOT_AVAILABLE(15,
-            new CoordinatorNotAvailableException("The coordinator is not available.")),
-    NOT_COORDINATOR(16,
-            new NotCoordinatorException("This is not the correct coordinator.")),
-    INVALID_TOPIC_EXCEPTION(17,
-            new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
-    RECORD_LIST_TOO_LARGE(18,
-            new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")),
-    NOT_ENOUGH_REPLICAS(19,
-            new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")),
-    NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
-            new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")),
-    INVALID_REQUIRED_ACKS(21,
-            new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")),
-    ILLEGAL_GENERATION(22,
-            new IllegalGenerationException("Specified group generation id is not valid.")),
+    UNKNOWN(-1, "The server experienced an unexpected error when processing the request",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnknownServerException(message);
+            }
+        }),
+    NONE(0, null,
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return null;
+            }
+        }),
+    OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range of offsets maintained by the server.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new OffsetOutOfRangeException(message);
+            }
+        }),
+    CORRUPT_MESSAGE(2, "This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new CorruptRecordException(message);
+            }
+        }),
+    UNKNOWN_TOPIC_OR_PARTITION(3, "This server does not host this topic-partition.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnknownTopicOrPartitionException(message);
+            }
+        }),
+    INVALID_FETCH_SIZE(4, "The requested fetch size is invalid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidFetchSizeException(message);
+            }
+        }),
+    LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new LeaderNotAvailableException(message);
+            }
+        }),
+    NOT_LEADER_FOR_PARTITION(6, "This server is not the leader for that topic-partition.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NotLeaderForPartitionException(message);
+            }
+        }),
+    REQUEST_TIMED_OUT(7, "The request timed out.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new TimeoutException(message);
+            }
+        }),
+    BROKER_NOT_AVAILABLE(8, "The broker is not available.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new BrokerNotAvailableException(message);
+            }
+        }),
+    REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ReplicaNotAvailableException(message);
+            }
+        }),
+    MESSAGE_TOO_LARGE(10, "The request included a message larger than the max message size the server will accept.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new RecordTooLargeException(message);
+            }
+        }),
+    STALE_CONTROLLER_EPOCH(11, "The controller moved to another broker.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ControllerMovedException(message);
+            }
+        }),
+    OFFSET_METADATA_TOO_LARGE(12, "The metadata field of the offset request was too large.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new OffsetMetadataTooLarge(message);
+            }
+        }),
+    NETWORK_EXCEPTION(13, "The server disconnected before a response was received.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NetworkException(message);
+            }
+        }),
+    COORDINATOR_LOAD_IN_PROGRESS(14, "The coordinator is loading and hence can't process requests.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new CoordinatorLoadInProgressException(message);
+            }
+        }),
+    COORDINATOR_NOT_AVAILABLE(15, "The coordinator is not available.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new CoordinatorNotAvailableException(message);
+            }
+        }),
+    NOT_COORDINATOR(16, "This is not the correct coordinator.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NotCoordinatorException(message);
+            }
+        }),
+    INVALID_TOPIC_EXCEPTION(17, "The request attempted to perform an operation on an invalid topic.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidTopicException(message);
+            }
+        }),
+    RECORD_LIST_TOO_LARGE(18, "The request included message batch larger than the configured segment size on the server.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new RecordBatchTooLargeException(message);
+            }
+        }),
+    NOT_ENOUGH_REPLICAS(19, "Messages are rejected since there are fewer in-sync replicas than required.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NotEnoughReplicasException(message);
+            }
+        }),
+    NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, "Messages are written to the log, but to fewer in-sync replicas than required.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NotEnoughReplicasAfterAppendException(message);
+            }
+        }),
+    INVALID_REQUIRED_ACKS(21, "Produce request specified an invalid value for required acks.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidRequiredAcksException(message);
+            }
+        }),
+    ILLEGAL_GENERATION(22, "Specified group generation id is not valid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new IllegalGenerationException(message);
+            }
+        }),
     INCONSISTENT_GROUP_PROTOCOL(23,
-            new InconsistentGroupProtocolException("The group member's supported protocols are incompatible with those of existing members.")),
-    INVALID_GROUP_ID(24,
-            new InvalidGroupIdException("The configured groupId is invalid")),
-    UNKNOWN_MEMBER_ID(25,
-            new UnknownMemberIdException("The coordinator is not aware of this member.")),
+            "The group member's supported protocols are incompatible with those of existing members.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InconsistentGroupProtocolException(message);
+            }
+        }),
+    INVALID_GROUP_ID(24, "The configured groupId is invalid",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidGroupIdException(message);
+            }
+        }),
+    UNKNOWN_MEMBER_ID(25, "The coordinator is not aware of this member.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnknownMemberIdException(message);
+            }
+        }),
     INVALID_SESSION_TIMEOUT(26,
-            new InvalidSessionTimeoutException("The session timeout is not within the range allowed by the broker " +
-                    "(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).")),
-    REBALANCE_IN_PROGRESS(27,
-            new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")),
-    INVALID_COMMIT_OFFSET_SIZE(28,
-            new InvalidCommitOffsetSizeException("The committing offset data size is not valid")),
-    TOPIC_AUTHORIZATION_FAILED(29,
-            new TopicAuthorizationException("Topic authorization failed.")),
-    GROUP_AUTHORIZATION_FAILED(30,
-            new GroupAuthorizationException("Group authorization failed.")),
-    CLUSTER_AUTHORIZATION_FAILED(31,
-            new ClusterAuthorizationException("Cluster authorization failed.")),
-    INVALID_TIMESTAMP(32,
-            new InvalidTimestampException("The timestamp of the message is out of acceptable range.")),
-    UNSUPPORTED_SASL_MECHANISM(33,
-            new UnsupportedSaslMechanismException("The broker does not support the requested SASL mechanism.")),
-    ILLEGAL_SASL_STATE(34,
-            new IllegalSaslStateException("Request is not valid given the current SASL state.")),
-    UNSUPPORTED_VERSION(35,
-            new UnsupportedVersionException("The version of API is not supported.")),
-    TOPIC_ALREADY_EXISTS(36,
-            new TopicExistsException("Topic with this name already exists.")),
-    INVALID_PARTITIONS(37,
-            new InvalidPartitionsException("Number of partitions is invalid.")),
-    INVALID_REPLICATION_FACTOR(38,
-            new InvalidReplicationFactorException("Replication-factor is invalid.")),
-    INVALID_REPLICA_ASSIGNMENT(39,
-            new InvalidReplicaAssignmentException("Replica assignment is invalid.")),
-    INVALID_CONFIG(40,
-            new InvalidConfigurationException("Configuration is invalid.")),
-    NOT_CONTROLLER(41,
-        new NotControllerException("This is not the correct controller for this cluster.")),
-    INVALID_REQUEST(42,
-        new InvalidRequestException("This most likely occurs because of a request being malformed by the client library or" +
-            " the message was sent to an incompatible broker. See the broker logs for more details.")),
-    UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
-        new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")),
-    POLICY_VIOLATION(44,
-        new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
-    OUT_OF_ORDER_SEQUENCE_NUMBER(45,
-        new OutOfOrderSequenceException("The broker received an out of order sequence number")),
-    DUPLICATE_SEQUENCE_NUMBER(46,
-        new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
-    INVALID_PRODUCER_EPOCH(47,
-        new ProducerFencedException("Producer attempted an operation with an old epoch")),
-    INVALID_TXN_STATE(48,
-        new InvalidTxnStateException("The producer attempted a transactional operation in an invalid state")),
-    INVALID_PID_MAPPING(49,
-        new InvalidPidMappingException("The PID mapping is invalid")),
-    INVALID_TRANSACTION_TIMEOUT(50,
-        new InvalidTxnTimeoutException("The transaction timeout is larger than the maximum value allowed by the broker " +
-            "(as configured by max.transaction.timeout.ms).")),
-    CONCURRENT_TRANSACTIONS(51,
-        new ConcurrentTransactionsException("The producer attempted to update a transaction " +
-             "while another concurrent operation on the same transaction was ongoing"));
+            "The session timeout is not within the range allowed by the broker " +
+            "(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidSessionTimeoutException(message);
+            }
+        }),
+    REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is needed.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new RebalanceInProgressException(message);
+            }
+        }),
+    INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not valid",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidCommitOffsetSizeException(message);
+            }
+        }),
+    TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new TopicAuthorizationException(message);
+            }
+        }),
+    GROUP_AUTHORIZATION_FAILED(30, "Group authorization failed.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new GroupAuthorizationException(message);
+            }
+        }),
+    CLUSTER_AUTHORIZATION_FAILED(31, "Cluster authorization failed.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ClusterAuthorizationException(message);
+            }
+        }),
+    INVALID_TIMESTAMP(32, "The timestamp of the message is out of acceptable range.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidTimestampException(message);
+            }
+        }),
+    UNSUPPORTED_SASL_MECHANISM(33, "The broker does not support the requested SASL mechanism.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnsupportedSaslMechanismException(message);
+            }
+        }),
+    ILLEGAL_SASL_STATE(34, "Request is not valid given the current SASL state.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new IllegalSaslStateException(message);
+            }
+        }),
+    UNSUPPORTED_VERSION(35, "The version of API is not supported.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnsupportedVersionException(message);
+            }
+        }),
+    TOPIC_ALREADY_EXISTS(36, "Topic with this name already exists.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new TopicExistsException(message);
+            }
+        }),
+    INVALID_PARTITIONS(37, "Number of partitions is invalid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidPartitionsException(message);
+            }
+        }),
+    INVALID_REPLICATION_FACTOR(38, "Replication-factor is invalid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidReplicationFactorException(message);
+            }
+        }),
+    INVALID_REPLICA_ASSIGNMENT(39, "Replica assignment is invalid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidReplicaAssignmentException(message);
+            }
+        }),
+    INVALID_CONFIG(40, "Configuration is invalid.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidConfigurationException(message);
+            }
+        }),
+    NOT_CONTROLLER(41, "This is not the correct controller for this cluster.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new NotControllerException(message);
+            }
+        }),
+    INVALID_REQUEST(42, "This most likely occurs because of a request being malformed by the " +
+                "client library or the message was sent to an incompatible broker. See the broker logs " +
+                "for more details.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidRequestException(message);
+            }
+        }),
+    UNSUPPORTED_FOR_MESSAGE_FORMAT(43, "The message format version on the broker does not support the request.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new UnsupportedForMessageFormatException(message);
+            }
+        }),
+    POLICY_VIOLATION(44, "Request parameters do not satisfy the configured policy.",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new PolicyViolationException(message);
+            }
+        }),
+    OUT_OF_ORDER_SEQUENCE_NUMBER(45, "The broker received an out of order sequence number",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new OutOfOrderSequenceException(message);
+            }
+        }),
+    DUPLICATE_SEQUENCE_NUMBER(46, "The broker received a duplicate sequence number",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new DuplicateSequenceNumberException(message);
+            }
+        }),
+    INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ProducerFencedException(message);
+            }
+        }),
+    INVALID_TXN_STATE(48, "The producer attempted a transactional operation in an invalid state",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidTxnStateException(message);
+            }
+        }),
+    INVALID_PID_MAPPING(49, "The PID mapping is invalid",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidPidMappingException(message);
+            }
+        }),
+    INVALID_TRANSACTION_TIMEOUT(50, "The transaction timeout is larger than the maximum value allowed by " +
+                "the broker (as configured by max.transaction.timeout.ms).",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new InvalidTxnTimeoutException(message);
+            }
+        }),
+    CONCURRENT_TRANSACTIONS(51, "The producer attempted to update a transaction " +
+                "while another concurrent operation on the same transaction was ongoing",
+        new ApiExceptionBuilder() {
+            @Override
+            public ApiException build(String message) {
+                return new ConcurrentTransactionsException(message);
+            }
+        });
+             
+    private interface ApiExceptionBuilder {
+        ApiException build(String message);
+    }
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
@@ -206,11 +480,13 @@ public enum Errors {
     }
 
     private final short code;
+    private final ApiExceptionBuilder builder;
     private final ApiException exception;
 
-    Errors(int code, ApiException exception) {
+    Errors(int code, String defaultExceptionString, ApiExceptionBuilder builder) {
         this.code = (short) code;
-        this.exception = exception;
+        this.builder = builder;
+        this.exception = builder.build(defaultExceptionString);
     }
 
     /**
@@ -221,6 +497,21 @@ public enum Errors {
     }
 
     /**
+     * Create an instance of the ApiException that contains the given error message.
+     *
+     * @param message    The message string to set.
+     * @return           The exception.
+     */
+    public ApiException exception(String message) {
+        if (message == null) {
+            // If no error message was specified, return an exception with the default error message.
+            return exception;
+        }
+        // Return an exception with the given error message.
+        return builder.build(message);
+    }
+
+    /**
      * Returns the class name of the exception or null if this is {@code Errors.NONE}.
      */
     public String exceptionName() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 54f9764..2c2b2dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 
+import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -61,6 +62,10 @@ public class CreateTopicsResponse extends AbstractResponse {
             return message;
         }
 
+        public ApiException exception() {
+            return error.exception(message);
+        }
+
         @Override
         public String toString() {
             return "Error(error=" + error + ", message=" + message + ")";

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index bd79653..017fdf4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -260,7 +260,8 @@ public class MetadataResponse extends AbstractResponse {
             }
         }
 
-        return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), internalTopics);
+        return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
+                internalTopics, this.controller);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 1a4de98..cb0ff89 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -659,7 +659,7 @@ public class Utils {
     /**
      * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
      */
-    public static void closeQuietly(Closeable closeable, String name) {
+    public static void closeQuietly(AutoCloseable closeable, String name) {
         if (closeable != null) {
             try {
                 closeable.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
new file mode 100644
index 0000000..36cb8e8
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.CreateTopicsResponse.Error;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * A unit test for KafkaAdminClient.
+ *
+ * See for an integration test of the KafkaAdminClient.
+ * Also see KafkaAdminClientIntegrationTest for a unit test of the admin client.
+ */
+public class KafkaAdminClientTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testGetOrCreateListValue() {
+        Map<String, List<String>> map = new HashMap<>();
+        List<String> fooList = KafkaAdminClient.getOrCreateListValue(map, "foo");
+        assertNotNull(fooList);
+        fooList.add("a");
+        fooList.add("b");
+        List<String> fooList2 = KafkaAdminClient.getOrCreateListValue(map, "foo");
+        assertEquals(fooList, fooList2);
+        assertTrue(fooList2.contains("a"));
+        assertTrue(fooList2.contains("b"));
+        List<String> barList = KafkaAdminClient.getOrCreateListValue(map, "bar");
+        assertNotNull(barList);
+        assertTrue(barList.isEmpty());
+    }
+
+    @Test
+    public void testCalcTimeoutMsRemainingAsInt() {
+        assertEquals(0, KafkaAdminClient.calcTimeoutMsRemainingAsInt(1000, 1000));
+        assertEquals(100, KafkaAdminClient.calcTimeoutMsRemainingAsInt(1000, 1100));
+        assertEquals(Integer.MAX_VALUE, KafkaAdminClient.calcTimeoutMsRemainingAsInt(0, Long.MAX_VALUE));
+        assertEquals(Integer.MIN_VALUE, KafkaAdminClient.calcTimeoutMsRemainingAsInt(Long.MAX_VALUE, 0));
+    }
+
+    @Test
+    public void testPrettyPrintException() {
+        assertEquals("Null exception.", KafkaAdminClient.prettyPrintException(null));
+        assertEquals("TimeoutException", KafkaAdminClient.prettyPrintException(new TimeoutException()));
+        assertEquals("TimeoutException: The foobar timed out.",
+            KafkaAdminClient.prettyPrintException(new TimeoutException("The foobar timed out.")));
+    }
+
+    private static Map<String, Object> newStrMap(String... vals) {
+        Map<String, Object> map = new HashMap<>();
+        map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121");
+        map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
+        if (vals.length % 2 != 0) {
+            throw new IllegalStateException();
+        }
+        for (int i = 0; i < vals.length; i += 2) {
+            map.put(vals[i], vals[i + 1]);
+        }
+        return map;
+    }
+
+    private static AdminClientConfig newConfMap(String... vals) {
+        return new AdminClientConfig(newStrMap(vals));
+    }
+
+    @Test
+    public void testGenerateClientId() {
+        Set<String> ids = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            String id = KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, ""));
+            assertTrue("Got duplicate id " + id, !ids.contains(id));
+            ids.add(id);
+        }
+        assertEquals("myCustomId",
+            KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId")));
+    }
+
+    private static class MockKafkaAdminClientContext implements AutoCloseable {
+        final static String CLUSTER_ID = "mockClusterId";
+        final AdminClientConfig adminClientConfig;
+        final Metadata metadata;
+        final HashMap<Integer, Node> nodes;
+        final MockClient mockClient;
+        final AdminClient client;
+        Cluster cluster;
+
+        MockKafkaAdminClientContext(Map<String, Object> config) {
+            this.adminClientConfig = new AdminClientConfig(config);
+            this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+            this.nodes = new HashMap<Integer, Node>();
+            this.nodes.put(0, new Node(0, "localhost", 8121));
+            this.nodes.put(1, new Node(1, "localhost", 8122));
+            this.nodes.put(2, new Node(2, "localhost", 8123));
+            this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
+            this.client = KafkaAdminClient.create(adminClientConfig, mockClient, metadata);
+            this.cluster = new Cluster(CLUSTER_ID,  nodes.values(),
+                Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+        }
+
+        @Override
+        public void close() {
+            this.client.close();
+        }
+    }
+
+    @Test
+    public void testCloseAdminClient() throws Exception {
+        try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
+        }
+    }
+
+    private static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass)
+        throws InterruptedException {
+        try {
+            future.get();
+            fail("Expected a " + exceptionClass.getSimpleName() + " exception, but got success.");
+        } catch (ExecutionException ee) {
+            Throwable cause = ee.getCause();
+            assertEquals("Expected a " + exceptionClass.getSimpleName() + " exception, but got " +
+                cause.getClass().getSimpleName(),
+                exceptionClass, cause.getClass());
+        }
+    }
+
+    /**
+     * Test that the client properly times out when we don't receive any metadata.
+     */
+    @Test
+    public void testTimeoutWithoutMetadata() throws Exception {
+        try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap(
+            AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
+            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
+            ctx.mockClient.setNode(new Node(0, "localhost", 8121));
+            ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{
+                    put("myTopic", new Error(Errors.NONE, ""));
+                }}));
+            KafkaFuture<Void> future = ctx.client.
+                createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{
+                        put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2}));
+                    }})), new CreateTopicsOptions().timeoutMs(1000)).all();
+            assertFutureError(future, TimeoutException.class);
+        }
+    }
+
+    @Test
+    public void testCreateTopics() throws Exception {
+        try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) {
+            ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
+            ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet());
+            ctx.mockClient.setNode(ctx.nodes.get(0));
+            ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{
+                    put("myTopic", new Error(Errors.NONE, ""));
+                }}));
+            KafkaFuture<Void> future = ctx.client.
+                createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{
+                        put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2}));
+                    }})), new CreateTopicsOptions().timeoutMs(10000)).all();
+            future.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
new file mode 100644
index 0000000..39868e0
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A unit test for KafkaFuture.
+ */
+public class KafkaFutureTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testCompleteFutures() throws Exception {
+        KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
+        assertTrue(future123.complete(123));
+        assertEquals(Integer.valueOf(123), future123.get());
+        assertFalse(future123.complete(456));
+        assertTrue(future123.isDone());
+        assertFalse(future123.isCancelled());
+        assertFalse(future123.isCompletedExceptionally());
+
+        KafkaFuture<Integer> future456 = KafkaFuture.completedFuture(456);
+        assertEquals(Integer.valueOf(456), future456.get());
+
+        KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
+        futureFail.completeExceptionally(new RuntimeException("We require more vespene gas"));
+        try {
+            futureFail.get();
+            Assert.fail("Expected an exception");
+        } catch (ExecutionException e) {
+            assertEquals(RuntimeException.class, e.getCause().getClass());
+            Assert.assertEquals("We require more vespene gas", e.getCause().getMessage());
+        }
+    }
+
+    @Test
+    public void testCompletingFutures() throws Exception {
+        final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        CompleterThread myThread = new CompleterThread(future, "You must construct additional pylons.");
+        assertFalse(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertFalse(future.isCancelled());
+        assertEquals("I am ready", future.getNow("I am ready"));
+        myThread.start();
+        String str = future.get(5, TimeUnit.MINUTES);
+        assertEquals("You must construct additional pylons.", str);
+        assertEquals("You must construct additional pylons.", future.getNow("I am ready"));
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+        assertFalse(future.isCancelled());
+        myThread.join();
+        assertEquals(null, myThread.testException);
+    }
+
+    private static class CompleterThread<T> extends Thread {
+        private final KafkaFutureImpl<T> future;
+        private final T value;
+        Throwable testException = null;
+
+        CompleterThread(KafkaFutureImpl<T> future, T value) {
+            this.future = future;
+            this.value = value;
+        }
+
+        @Override
+        public void run() {
+            try {
+                try {
+                    Thread.sleep(0, 200);
+                } catch (InterruptedException e) {
+                }
+                future.complete(value);
+            } catch (Throwable testException) {
+                this.testException = testException;
+            }
+        }
+    }
+
+    private static class WaiterThread<T> extends Thread {
+        private final KafkaFutureImpl<T> future;
+        private final T expected;
+        Throwable testException = null;
+
+        WaiterThread(KafkaFutureImpl<T> future, T expected) {
+            this.future = future;
+            this.expected = expected;
+        }
+
+        @Override
+        public void run() {
+            try {
+                T value = future.get();
+                assertEquals(expected, value);
+            } catch (Throwable testException) {
+                this.testException = testException;
+            }
+        }
+    }
+
+    @Test
+    public void testAllOfFutures() throws Exception {
+        final int numThreads = 5;
+        final List<KafkaFutureImpl<Integer>> futures = new ArrayList<>();
+        for (int i = 0; i < numThreads; i++) {
+            futures.add(new KafkaFutureImpl<Integer>());
+        }
+        KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));
+        final List<CompleterThread> completerThreads = new ArrayList<>();
+        final List<WaiterThread> waiterThreads = new ArrayList<>();
+        for (int i = 0; i < numThreads; i++) {
+            completerThreads.add(new CompleterThread<>(futures.get(i), i));
+            waiterThreads.add(new WaiterThread<>(futures.get(i), i));
+        }
+        assertFalse(allFuture.isDone());
+        for (int i = 0; i < numThreads; i++) {
+            waiterThreads.get(i).start();
+        }
+        for (int i = 0; i < numThreads - 1; i++) {
+            completerThreads.get(i).start();
+        }
+        assertFalse(allFuture.isDone());
+        completerThreads.get(numThreads - 1).start();
+        allFuture.get();
+        assertTrue(allFuture.isDone());
+        for (int i = 0; i < numThreads; i++) {
+            assertEquals(Integer.valueOf(i), futures.get(i).get());
+        }
+        for (int i = 0; i < numThreads; i++) {
+            completerThreads.get(i).join();
+            waiterThreads.get(i).join();
+            assertEquals(null, completerThreads.get(i).testException);
+            assertEquals(null, waiterThreads.get(i).testException);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 7342bbb..d297b70 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -40,6 +40,11 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 import scala.collection.JavaConverters._
 import scala.util.Try
 
+/**
+  * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
+  * and configurations.  This client is deprecated, and will be replaced by KafkaAdminClient.
+  * @see KafkaAdminClient
+  */
 class AdminClient(val time: Time,
                   val requestTimeoutMs: Int,
                   val retryBackoffMs: Long,

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
deleted file mode 100644
index ff9fef0..0000000
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.util.Collections
-import java.util.concurrent.TimeUnit
-
-import kafka.admin.AdminClient
-import kafka.admin.AdminClient.DeleteRecordsResult
-import kafka.server.KafkaConfig
-import java.lang.{Long => JLong}
-import kafka.utils.{Logging, TestUtils}
-import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{Errors, ApiKeys}
-import org.apache.kafka.common.requests.DeleteRecordsRequest
-import org.junit.{After, Before, Test}
-import org.junit.Assert._
-import scala.collection.JavaConverters._
-
-class AdminClientTest extends IntegrationTestHarness with Logging {
-
-  val producerCount = 1
-  val consumerCount = 2
-  val serverCount = 3
-  val groupId = "my-test"
-  val clientId = "consumer-498"
-
-  val topic = "topic"
-  val part = 0
-  val tp = new TopicPartition(topic, part)
-  val part2 = 1
-  val tp2 = new TopicPartition(topic, part2)
-
-  var client: AdminClient = null
-
-  // configure the servers and clients
-  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
-  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
-  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
-  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
-  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
-  this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
-  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    client = AdminClient.createSimplePlaintext(this.brokerList)
-    TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
-  }
-
-  @After
-  override def tearDown() {
-    client.close()
-    super.tearDown()
-  }
-
-  @Test
-  def testSeekToBeginningAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(0L, consumer.position(tp))
-
-    client.deleteRecordsBefore(Map((tp, 5L))).get()
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(5L, consumer.position(tp))
-
-    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
-    consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(10L, consumer.position(tp))
-  }
-
-  @Test
-  def testConsumeAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    var messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 10
-    }, "Expected 10 messages", 3000L)
-
-    client.deleteRecordsBefore(Map((tp, 3L))).get()
-    consumer.seek(tp, 1)
-    messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 7
-    }, "Expected 7 messages", 3000L)
-
-    client.deleteRecordsBefore(Map((tp, 8L))).get()
-    consumer.seek(tp, 1)
-    messageCount = 0
-    TestUtils.waitUntilTrue(() => {
-      messageCount += consumer.poll(0).count()
-      messageCount == 2
-    }, "Expected 2 messages", 3000L)
-  }
-
-  @Test
-  def testLogStartOffsetCheckpoint() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
-
-    for (i <- 0 until serverCount)
-      killBroker(i)
-    restartDeadBrokers()
-
-    client.close()
-    brokerList = TestUtils.bootstrapServers(servers, listenerName)
-    client = AdminClient.createSimplePlaintext(brokerList)
-
-    TestUtils.waitUntilTrue(() => {
-      // Need to retry if leader is not available for the partition
-      client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
-    }, "Expected low watermark of the partition to be 5L")
-  }
-
-  @Test
-  def testLogStartOffsetAfterDeleteRecords() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    client.deleteRecordsBefore(Map((tp, 3L))).get()
-
-    for (i <- 0 until serverCount)
-      assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
-  }
-
-  @Test
-  def testOffsetsForTimesAfterDeleteRecords() {
-    val consumer = consumers.head
-    subscribeAndWaitForAssignment(topic, consumer)
-
-    sendRecords(producers.head, 10, tp)
-    assertEquals(0L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
-
-    client.deleteRecordsBefore(Map((tp, 5L))).get()
-    assertEquals(5L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
-
-    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
-    assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp))
-  }
-
-  @Test
-  def testDeleteRecordsWithException() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    sendRecords(producers.head, 10, tp)
-    // Should get success result
-    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
-    // OffsetOutOfRangeException if offset > high_watermark
-    assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
-
-    val nonExistPartition = new TopicPartition(topic, 3)
-    // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
-    assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
-                 client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
-  }
-
-  @Test
-  def testListGroups() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    val groups = client.listAllGroupsFlattened
-    assertFalse(groups.isEmpty)
-    val group = groups.head
-    assertEquals(groupId, group.groupId)
-    assertEquals("consumer", group.protocolType)
-  }
-
-  @Test
-  def testListAllBrokerVersionInfo() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    val brokerVersionInfos = client.listAllBrokerVersionInfo
-    val brokers = brokerList.split(",")
-    assertEquals(brokers.size, brokerVersionInfos.size)
-    for ((node, tryBrokerVersionInfo) <- brokerVersionInfos) {
-      val hostStr = s"${node.host}:${node.port}"
-      assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
-      val brokerVersionInfo = tryBrokerVersionInfo.get
-      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
-    }
-  }
-
-  @Test
-  def testGetConsumerGroupSummary() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    val group = client.describeConsumerGroup(groupId)
-    assertEquals("range", group.assignmentStrategy)
-    assertEquals("Stable", group.state)
-    assertFalse(group.consumers.isEmpty)
-
-    val member = group.consumers.get.head
-    assertEquals(clientId, member.clientId)
-    assertFalse(member.host.isEmpty)
-    assertFalse(member.consumerId.isEmpty)
-  }
-
-  @Test
-  def testDescribeConsumerGroup() {
-    subscribeAndWaitForAssignment(topic, consumers.head)
-
-    val consumerGroupSummary = client.describeConsumerGroup(groupId)
-    assertEquals(1, consumerGroupSummary.consumers.get.size)
-    assertEquals(List(tp, tp2), consumerGroupSummary.consumers.get.flatMap(_.assignment))
-  }
-
-  @Test
-  def testDescribeConsumerGroupForNonExistentGroup() {
-    val nonExistentGroup = "non" + groupId
-    assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
-  }
-
-  private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
-    consumer.subscribe(Collections.singletonList(topic))
-    TestUtils.waitUntilTrue(() => {
-      consumer.poll(0)
-      !consumer.assignment.isEmpty
-    }, "Expected non-empty assignment")
-  }
-
-  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
-                          numRecords: Int,
-                          tp: TopicPartition) {
-    val futures = (0 until numRecords).map { i =>
-      val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
-      debug(s"Sending this record: $record")
-      producer.send(record)
-    }
-
-    futures.foreach(_.get)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
new file mode 100644
index 0000000..9c5468f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import org.apache.kafka.common.utils.Utils
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import org.apache.kafka.clients.admin._
+import kafka.utils.{Logging, TestUtils}
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.common.KafkaFuture
+import org.apache.kafka.common.errors.TopicExistsException
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.{After, Rule, Test}
+import org.junit.rules.Timeout
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+/**
+ * An integration test of the KafkaAdminClient.
+ *
+ * Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client.
+ */
+class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
+
+  @Rule
+  def globalTimeout = Timeout.millis(120000)
+
+  var client: AdminClient = null
+
+  @After
+  def closeClient(): Unit = {
+    if (client != null)
+      Utils.closeQuietly(client, "AdminClient")
+  }
+
+  val brokerCount = 3
+  lazy val serverConfig = new Properties
+
+  def createConfig(): util.Map[String, Object] = {
+    val config = new util.HashMap[String, Object]
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    config
+  }
+
+  def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
+    TestUtils.waitUntilTrue(() => {
+        val topics = client.listTopics().names().get()
+        expectedPresent.forall(topicName => topics.contains(topicName)) &&
+          expectedMissing.forall(topicName => !topics.contains(topicName))
+      }, "timed out waiting for topics")
+  }
+
+  def assertFutureExceptionTypeEquals(future: KafkaFuture[_], clazz: Class[_ <: Throwable]): Unit = {
+    try {
+      future.get()
+      fail("Expected CompletableFuture.get to return an exception")
+    } catch {
+      case e: ExecutionException =>
+        val cause = e.getCause()
+        assertTrue("Expected an exception of type " + clazz.getName + "; got type " +
+            cause.getClass().getName, clazz.isInstance(cause))
+    }
+  }
+
+  @Test
+  def testClose(): Unit = {
+    val client = AdminClient.create(createConfig())
+    client.close()
+    client.close() // double close has no effect
+  }
+
+  @Test
+  def testListNodes(): Unit = {
+    client = AdminClient.create(createConfig())
+    val brokerStrs = brokerList.split(",").toList.sorted
+    var nodeStrs : List[String] = null
+    do {
+      var nodes = client.describeCluster().nodes().get().asScala
+      nodeStrs = nodes.map ( node => s"${node.host}:${node.port}" ).toList.sorted
+    } while (nodeStrs.size < brokerStrs.size)
+    assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(","))
+    client.close()
+  }
+
+  @Test
+  def testCreateDeleteTopics(): Unit = {
+    client = AdminClient.create(createConfig())
+    val newTopics : List[NewTopic] = List(
+        new NewTopic("mytopic", 1, 1),
+        new NewTopic("mytopic2", 1, 1))
+    client.createTopics(newTopics.asJava,
+      new CreateTopicsOptions().validateOnly(true)).all().get()
+    waitForTopics(client, List(), List("mytopic", "mytopic2"))
+
+    client.createTopics(newTopics.asJava).all().get()
+    waitForTopics(client, List("mytopic", "mytopic2"), List())
+
+    val results = client.createTopics(newTopics.asJava).results()
+    assert(results.containsKey("mytopic"))
+    assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
+    assert(results.containsKey("mytopic2"))
+    assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException])
+
+    val deleteTopics : Set[String] = Set("mytopic", "mytopic2")
+    client.deleteTopics(deleteTopics.asJava).all().get()
+    waitForTopics(client, List(), List("mytopic", "mytopic2"))
+
+    client.close()
+  }
+
+  @Test
+  def testGetAllBrokerVersions(): Unit = {
+    client = AdminClient.create(createConfig())
+    val nodes = client.describeCluster().nodes().get()
+    val nodesToVersions = client.apiVersions(nodes).all().get()
+    val brokers = brokerList.split(",")
+    assert(brokers.size == nodesToVersions.size())
+    for ((node, brokerVersionInfo) <- nodesToVersions.asScala) {
+      val hostStr = s"${node.host}:${node.port}"
+      assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
+      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+    }
+    client.close()
+  }
+
+  override def generateConfigs() = {
+    val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
+    cfgs.foreach { config =>
+      config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
+      config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
+      config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
+      config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
+      config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true");
+    }
+    cfgs.foreach(_.putAll(serverConfig))
+    cfgs.map(KafkaConfig.fromProps)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
new file mode 100644
index 0000000..434a47f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+
+import kafka.admin.AdminClient
+import kafka.admin.AdminClient.DeleteRecordsResult
+import kafka.server.KafkaConfig
+import java.lang.{Long => JLong}
+import kafka.utils.{Logging, TestUtils}
+import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, ProducerConfig}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{Errors, ApiKeys}
+import org.apache.kafka.common.requests.DeleteRecordsRequest
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import scala.collection.JavaConverters._
+
+/**
+  * Tests for the deprecated Scala AdminClient.
+  */
+class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
+
+  val producerCount = 1
+  val consumerCount = 2
+  val serverCount = 3
+  val groupId = "my-test"
+  val clientId = "consumer-498"
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+  val part2 = 1
+  val tp2 = new TopicPartition(topic, part2)
+
+  var client: AdminClient = null
+
+  // configure the servers and clients
+  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+  this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+  this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    client = AdminClient.createSimplePlaintext(this.brokerList)
+    TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
+  }
+
+  @After
+  override def tearDown() {
+    client.close()
+    super.tearDown()
+  }
+
+  @Test
+  def testSeekToBeginningAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(0L, consumer.position(tp))
+
+    client.deleteRecordsBefore(Map((tp, 5L))).get()
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(5L, consumer.position(tp))
+
+    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+    consumer.seekToBeginning(Collections.singletonList(tp))
+    assertEquals(10L, consumer.position(tp))
+  }
+
+  @Test
+  def testConsumeAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    var messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count()
+      messageCount == 10
+    }, "Expected 10 messages", 3000L)
+
+    client.deleteRecordsBefore(Map((tp, 3L))).get()
+    consumer.seek(tp, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count()
+      messageCount == 7
+    }, "Expected 7 messages", 3000L)
+
+    client.deleteRecordsBefore(Map((tp, 8L))).get()
+    consumer.seek(tp, 1)
+    messageCount = 0
+    TestUtils.waitUntilTrue(() => {
+      messageCount += consumer.poll(0).count()
+      messageCount == 2
+    }, "Expected 2 messages", 3000L)
+  }
+
+  @Test
+  def testLogStartOffsetCheckpoint() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+
+    for (i <- 0 until serverCount)
+      killBroker(i)
+    restartDeadBrokers()
+
+    client.close()
+    brokerList = TestUtils.bootstrapServers(servers, listenerName)
+    client = AdminClient.createSimplePlaintext(brokerList)
+
+    TestUtils.waitUntilTrue(() => {
+      // Need to retry if leader is not available for the partition
+      client.deleteRecordsBefore(Map((tp, 0L))).get(1000L, TimeUnit.MILLISECONDS)(tp).equals(DeleteRecordsResult(5L, null))
+    }, "Expected low watermark of the partition to be 5L")
+  }
+
+  @Test
+  def testLogStartOffsetAfterDeleteRecords() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    client.deleteRecordsBefore(Map((tp, 3L))).get()
+
+    for (i <- 0 until serverCount)
+      assertEquals(3, servers(i).replicaManager.getReplica(tp).get.logStartOffset)
+  }
+
+  @Test
+  def testOffsetsForTimesAfterDeleteRecords() {
+    val consumer = consumers.head
+    subscribeAndWaitForAssignment(topic, consumer)
+
+    sendRecords(producers.head, 10, tp)
+    assertEquals(0L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+    client.deleteRecordsBefore(Map((tp, 5L))).get()
+    assertEquals(5L, consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp).offset())
+
+    client.deleteRecordsBefore(Map((tp, DeleteRecordsRequest.HIGH_WATERMARK))).get()
+    assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp))
+  }
+
+  @Test
+  def testDeleteRecordsWithException() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    sendRecords(producers.head, 10, tp)
+    // Should get success result
+    assertEquals(DeleteRecordsResult(5L, null), client.deleteRecordsBefore(Map((tp, 5L))).get()(tp))
+    // OffsetOutOfRangeException if offset > high_watermark
+    assertEquals(DeleteRecordsResult(-1L, Errors.OFFSET_OUT_OF_RANGE.exception()), client.deleteRecordsBefore(Map((tp, 20))).get()(tp))
+
+    val nonExistPartition = new TopicPartition(topic, 3)
+    // UnknownTopicOrPartitionException if user tries to delete records of a non-existent partition
+    assertEquals(DeleteRecordsResult(-1L, Errors.LEADER_NOT_AVAILABLE.exception()),
+                 client.deleteRecordsBefore(Map((nonExistPartition, 20))).get()(nonExistPartition))
+  }
+
+  @Test
+  def testListGroups() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    val groups = client.listAllGroupsFlattened
+    assertFalse(groups.isEmpty)
+    val group = groups.head
+    assertEquals(groupId, group.groupId)
+    assertEquals("consumer", group.protocolType)
+  }
+
+  @Test
+  def testListAllBrokerVersionInfo() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    val brokerVersionInfos = client.listAllBrokerVersionInfo
+    val brokers = brokerList.split(",")
+    assertEquals(brokers.size, brokerVersionInfos.size)
+    for ((node, tryBrokerVersionInfo) <- brokerVersionInfos) {
+      val hostStr = s"${node.host}:${node.port}"
+      assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
+      val brokerVersionInfo = tryBrokerVersionInfo.get
+      assertEquals(1, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+    }
+  }
+
+  @Test
+  def testGetConsumerGroupSummary() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    val group = client.describeConsumerGroup(groupId)
+    assertEquals("range", group.assignmentStrategy)
+    assertEquals("Stable", group.state)
+    assertFalse(group.consumers.isEmpty)
+
+    val member = group.consumers.get.head
+    assertEquals(clientId, member.clientId)
+    assertFalse(member.host.isEmpty)
+    assertFalse(member.consumerId.isEmpty)
+  }
+
+  @Test
+  def testDescribeConsumerGroup() {
+    subscribeAndWaitForAssignment(topic, consumers.head)
+
+    val consumerGroupSummary = client.describeConsumerGroup(groupId)
+    assertEquals(1, consumerGroupSummary.consumers.get.size)
+    assertEquals(List(tp, tp2), consumerGroupSummary.consumers.get.flatMap(_.assignment))
+  }
+
+  @Test
+  def testDescribeConsumerGroupForNonExistentGroup() {
+    val nonExistentGroup = "non" + groupId
+    assertTrue("Expected empty ConsumerSummary list", client.describeConsumerGroup(nonExistentGroup).consumers.get.isEmpty)
+  }
+
+  private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
+    consumer.subscribe(Collections.singletonList(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(0)
+      !consumer.assignment.isEmpty
+    }, "Expected non-empty assignment")
+  }
+
+  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                          numRecords: Int,
+                          tp: TopicPartition) {
+    val futures = (0 until numRecords).map { i =>
+      val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes)
+      debug(s"Sending this record: $record")
+      producer.send(record)
+    }
+
+    futures.foreach(_.get)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
new file mode 100644
index 0000000..f20ed0f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -0,0 +1,26 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.io.File
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+import kafka.server.KafkaConfig
+
+class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslTestHarness {
+  override protected val zkSaslEnabled = true
+  this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
+  
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 02b5fe3..e826c7f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -561,6 +561,9 @@ object TestUtils extends Logging {
   def consumerSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
     securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "consumer", saslProperties)
 
+  def adminClientSecurityConfigs(securityProtocol: SecurityProtocol, trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
+    securityConfigs(Mode.CLIENT, securityProtocol, trustStoreFile, "admin-client", saslProperties)
+
   /**
    * Create a new consumer with a few pre-configured properties.
    */


[2/3] kafka git commit: KAFKA-3265; Add a public AdminClient API in Java (KIP-117)

Posted by ij...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
new file mode 100644
index 0000000..cf37aa7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -0,0 +1,1065 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.BrokerNotAvailableException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteTopicsRequest;
+import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+
+/**
+ * An administrative client for Kafka which supports managing and inspecting topics, brokers,
+ * and configurations.
+ */
+@InterfaceStability.Unstable
+public class KafkaAdminClient extends AdminClient {
+    private static final Logger log = LoggerFactory.getLogger(KafkaAdminClient.class);
+
+    /**
+     * The maximum number of times to retry a call before failing it.
+     */
+    private static final int MAX_CALL_RETRIES = 5;
+
+    /**
+     * The next integer to use to name a KafkaAdminClient which the user hasn't specified an explicit name for.
+     */
+    private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+
+    /**
+     * The prefix to use for the JMX metrics for this class
+     */
+    private static final String JMX_PREFIX = "kafka.admin.client";
+
+    /**
+     * The default timeout to use for an operation.
+     */
+    private final int defaultTimeoutMs;
+
+    /**
+     * The name of this AdminClient instance.
+     */
+    private final String clientId;
+
+    /**
+     * Provides the time.
+     */
+    private final Time time;
+
+    /**
+     * The cluster metadata used by the KafkaClient.
+     */
+    private final Metadata metadata;
+
+    /**
+     * The metrics for this KafkaAdminClient.
+     */
+    private final Metrics metrics;
+
+    /**
+     * The network client to use.
+     */
+    private final KafkaClient client;
+
+    /**
+     * The runnable used in the service thread for this admin client.
+     */
+    private final AdminClientRunnable runnable;
+
+    /**
+     * The network service thread for this admin client.
+     */
+    private final Thread thread;
+
+    /**
+     * True if this client is closed.
+     */
+    private volatile boolean closed = false;
+
+    /**
+     * Get or create a list value from a map.
+     *
+     * @param map   The map to get or create the element from.
+     * @param key   The key.
+     * @param <K>   The key type.
+     * @param <V>   The value type.
+     * @return      The list value.
+     */
+    static <K, V> List<V> getOrCreateListValue(Map<K, List<V>> map, K key) {
+        List<V> list = map.get(key);
+        if (list != null)
+            return list;
+        list = new LinkedList<>();
+        map.put(key, list);
+        return list;
+    }
+
+    /**
+     * Send an exception to every element in a collection of KafkaFutureImpls.
+     *
+     * @param futures   The collection of KafkaFutureImpl objects.
+     * @param exc       The exception
+     * @param <T>       The KafkaFutureImpl result type.
+     */
+    private static <T> void completeAllExceptionally(Collection<KafkaFutureImpl<T>> futures, Throwable exc) {
+        for (KafkaFutureImpl<?> future : futures) {
+            future.completeExceptionally(exc);
+        }
+    }
+
+    /**
+     * Get the current time remaining before a deadline as an integer.
+     *
+     * @param now           The current time in milliseconds.
+     * @param deadlineMs    The deadline time in milliseconds.
+     * @return              The time delta in milliseconds.
+     */
+    static int calcTimeoutMsRemainingAsInt(long now, long deadlineMs) {
+        long deltaMs = deadlineMs - now;
+        if (deltaMs > Integer.MAX_VALUE)
+            deltaMs = Integer.MAX_VALUE;
+        else if (deltaMs < Integer.MIN_VALUE)
+            deltaMs = Integer.MIN_VALUE;
+        return (int) deltaMs;
+    }
+
+    /**
+     * Generate the client id based on the configuration.
+     *
+     * @param config    The configuration
+     *
+     * @return          The client id
+     */
+    static String generateClientId(AdminClientConfig config) {
+        String clientId = config.getString(AdminClientConfig.CLIENT_ID_CONFIG);
+        if (!clientId.isEmpty())
+            return clientId;
+        return "adminclient-" + ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
+    }
+
+    /**
+     * Get the deadline for a particular call.
+     *
+     * @param now               The current time in milliseconds.
+     * @param optionTimeoutMs   The timeout option given by the user.
+     *
+     * @return                  The deadline in milliseconds.
+     */
+    private long calcDeadlineMs(long now, Integer optionTimeoutMs) {
+        if (optionTimeoutMs != null)
+            return now + Math.max(0, optionTimeoutMs);
+        return now + defaultTimeoutMs;
+    }
+
+    /**
+     * Pretty-print an exception.
+     *
+     * @param throwable     The exception.
+     *
+     * @return              A compact human-readable string.
+     */
+    static String prettyPrintException(Throwable throwable) {
+        if (throwable == null)
+            return "Null exception.";
+        if (throwable.getMessage() != null) {
+            return throwable.getClass().getSimpleName() + ": " + throwable.getMessage();
+        }
+        return throwable.getClass().getSimpleName();
+    }
+
+    static KafkaAdminClient create(AdminClientConfig config) {
+        Metadata metadata = null;
+        Metrics metrics = null;
+        NetworkClient networkClient = null;
+        Time time = Time.SYSTEM;
+        String clientId = generateClientId(config);
+        ChannelBuilder channelBuilder = null;
+        Selector selector = null;
+        ApiVersions apiVersions = new ApiVersions();
+
+        try {
+            metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                    config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+            List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                MetricsReporter.class);
+            Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
+                .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+                .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+                .tags(metricTags);
+            reporters.add(new JmxReporter(JMX_PREFIX));
+            metrics = new Metrics(metricConfig, reporters, time);
+            String metricGrpPrefix = "admin-client";
+            channelBuilder = ClientUtils.createChannelBuilder(config);
+            selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+                    metrics, time, metricGrpPrefix, channelBuilder);
+            networkClient = new NetworkClient(
+                selector,
+                metadata,
+                clientId,
+                100,
+                config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
+                config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
+                config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                time,
+                true,
+                apiVersions);
+            channelBuilder = null;
+            return new KafkaAdminClient(config, clientId, time, metadata, metrics, networkClient);
+        } catch (Throwable exc) {
+            closeQuietly(metrics, "Metrics");
+            closeQuietly(networkClient, "NetworkClient");
+            closeQuietly(selector, "Selector");
+            closeQuietly(channelBuilder, "ChannelBuilder");
+            throw new KafkaException("Failed create new KafkaAdminClient", exc);
+        }
+    }
+
+    static KafkaAdminClient create(AdminClientConfig config, KafkaClient client, Metadata metadata) {
+        Metrics metrics = null;
+        Time time = Time.SYSTEM;
+        String clientId = generateClientId(config);
+
+        try {
+            metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time);
+            return new KafkaAdminClient(config, clientId, time, metadata, metrics, client);
+        } catch (Throwable exc) {
+            closeQuietly(metrics, "Metrics");
+            throw new KafkaException("Failed create new KafkaAdminClient", exc);
+        }
+    }
+
+    private KafkaAdminClient(AdminClientConfig config, String clientId, Time time, Metadata metadata,
+                     Metrics metrics, KafkaClient client) {
+        this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.clientId = clientId;
+        this.time = time;
+        this.metadata = metadata;
+        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+            config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+        this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
+        this.metrics = metrics;
+        this.client = client;
+        this.runnable = new AdminClientRunnable();
+        String threadName = "kafka-admin-client-thread" + (clientId.length() > 0 ? " | " + clientId : "");
+        this.thread = new KafkaThread(threadName, runnable, false);
+        config.logUnused();
+        log.debug("Created Kafka admin client {}", this.clientId);
+        thread.start();
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+        client.wakeup(); // Wake the thread, if it is blocked inside poll().
+        try {
+            // Wait for the thread to be joined.
+            thread.join();
+            log.debug("{}: closed.", clientId);
+        } catch (InterruptedException e) {
+            log.debug("{}: interrupted while joining I/O thread", clientId, e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * An interface for providing a node for a call.
+     */
+    private interface NodeProvider {
+        Node provide();
+    }
+
+    /**
+     * Provides a constant node which is known at construction time.
+     */
+    private static class ConstantAdminNodeProvider implements NodeProvider {
+        private final Node node;
+
+        ConstantAdminNodeProvider(Node node) {
+            this.node = node;
+        }
+
+        @Override
+        public Node provide() {
+            return node;
+        }
+    }
+
+    /**
+     * Provides the controller node.
+     */
+    private class ControllerNodeProvider implements NodeProvider {
+        @Override
+        public Node provide() {
+            return metadata.fetch().controller();
+        }
+    }
+
+    /**
+     * Provides the least loaded node.
+     */
+    private class LeastLoadedNodeProvider implements NodeProvider {
+        @Override
+        public Node provide() {
+            return client.leastLoadedNode(time.milliseconds());
+        }
+    }
+
+    private abstract class Call {
+        private final String callName;
+        private final long deadlineMs;
+        private final NodeProvider nodeProvider;
+        private int tries = 0;
+
+        Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
+            this.callName = callName;
+            this.deadlineMs = deadlineMs;
+            this.nodeProvider = nodeProvider;
+        }
+
+        /**
+         * Handle a failure.
+         *
+         * Depending on what the exception is and how many times we have already tried, we may choose to
+         * fail the Call, or retry it.  It is important to print the stack traces here in some cases,
+         * since they are not necessarily preserved in ApiVersionException objects.
+         *
+         * @param now           The current time in milliseconds.
+         * @param throwable     The failure exception.
+         */
+        final void fail(long now, Throwable throwable) {
+            // If this is an UnsupportedVersionException that we can retry, do so.
+            if ((throwable instanceof UnsupportedVersionException) &&
+                     handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
+                log.trace("{} attempting protocol downgrade.", this);
+                runnable.call(this, now);
+                return;
+            }
+            tries++;
+            // If the call has timed out, fail.
+            if (calcTimeoutMsRemainingAsInt(now, deadlineMs) < 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
+                        new Exception(prettyPrintException(throwable)));
+                }
+                handleFailure(throwable);
+                return;
+            }
+            // If the exception is not retryable, fail.
+            if (!(throwable instanceof RetriableException)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("{} failed with non-retriable exception after {} attempt(s)", this, tries,
+                        new Exception(prettyPrintException(throwable)));
+                }
+                handleFailure(throwable);
+                return;
+            }
+            // If we are out of retries, fail.
+            if (tries > MAX_CALL_RETRIES) {
+                if (log.isDebugEnabled()) {
+                    log.debug("{} failed after {} attempt(s)", this, tries,
+                        new Exception(prettyPrintException(throwable)));
+                }
+                handleFailure(throwable);
+                return;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("{} failed: {}.  Beginning retry #{}",
+                    this, prettyPrintException(throwable), tries);
+            }
+            runnable.call(this, now);
+        }
+
+        /**
+         * Create an AbstractRequest.Builder for this Call.
+         *
+         * @param timeoutMs The timeout in milliseconds.
+         *
+         * @return          The AbstractRequest builder.
+         */
+        abstract AbstractRequest.Builder createRequest(int timeoutMs);
+
+        /**
+         * Process the call response.
+         *
+         * @param abstractResponse  The AbstractResponse.
+         *
+         * @return                  True if the response has been processed; false to re-submit the request.
+         */
+        abstract void handleResponse(AbstractResponse abstractResponse);
+
+        /**
+         * Handle a failure.  This will only be called if the failure exception was not
+         * retryable, or if we hit a timeout.
+         *
+         * @param throwable     The exception.
+         */
+        abstract void handleFailure(Throwable throwable);
+
+        /**
+         * Handle an UnsupportedVersionException.
+         *
+         * @param exception     The exception.
+         *
+         * @return              True if the exception can be handled; false otherwise.
+         */
+        boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return "Call(callName=" + callName + ", deadlineMs=" + deadlineMs + ")";
+        }
+    }
+
+    private final class AdminClientRunnable implements Runnable {
+        /**
+         * Pending calls.  Protected by the object monitor.
+         */
+        private List<Call> newCalls = new LinkedList<>();
+
+        /**
+         * Check if the AdminClient metadata is ready.
+         * We need to know who the controller is, and have a non-empty view of the cluster.
+         *
+         * @param prevMetadataVersion       The previous metadata version which wasn't usable.
+         * @return                          null if the metadata is usable; the current metadata
+         *                                  version otherwise
+         */
+        private Integer checkMetadataReady(Integer prevMetadataVersion) {
+            if (prevMetadataVersion != null) {
+                if (prevMetadataVersion == metadata.version())
+                    return prevMetadataVersion;
+            }
+            Cluster cluster = metadata.fetch();
+            if (cluster.nodes().isEmpty()) {
+                log.trace("{}: metadata is not ready yet.  No cluster nodes found.", clientId);
+                return metadata.requestUpdate();
+            }
+            if (cluster.controller() == null) {
+                log.trace("{}: metadata is not ready yet.  No controller found.", clientId);
+                return metadata.requestUpdate();
+            }
+            if (prevMetadataVersion != null) {
+                log.trace("{}: metadata is now ready.", clientId);
+            }
+            return null;
+        }
+
+        /**
+         * Time out a list of calls.
+         *
+         * @param now       The current time in milliseconds.
+         * @param calls     The collection of calls.  Must be sorted from oldest to newest.
+         */
+        private int timeoutCalls(long now, Collection<Call> calls) {
+            int numTimedOut = 0;
+            for (Iterator<Call> iter = calls.iterator(); iter.hasNext(); ) {
+                Call call = iter.next();
+                if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
+                    call.fail(now, new TimeoutException());
+                    iter.remove();
+                    numTimedOut++;
+                }
+            }
+            return numTimedOut;
+        }
+
+        /**
+         * Time out the elements in the newCalls list which are expired.
+         *
+         * @param now       The current time in milliseconds.
+         */
+        private synchronized void timeoutNewCalls(long now) {
+            int numTimedOut = timeoutCalls(now, newCalls);
+            if (numTimedOut > 0) {
+                log.debug("{}: timed out {} new calls.", clientId, numTimedOut);
+            }
+        }
+
+        /**
+         * Time out calls which have been assigned to nodes.
+         *
+         * @param now           The current time in milliseconds.
+         * @param callsToSend   A map of nodes to the calls they need to handle.
+         */
+        private void timeoutCallsToSend(long now, Map<Node, List<Call>> callsToSend) {
+            int numTimedOut = 0;
+            for (List<Call> callList : callsToSend.values()) {
+                numTimedOut += timeoutCalls(now, callList);
+            }
+            if (numTimedOut > 0)
+                log.debug("{}: timed out {} call(s) with assigned nodes.", clientId, numTimedOut);
+        }
+
+        /**
+         * Choose nodes for the calls in the callsToSend list.
+         *
+         * This function holds the lock for the minimum amount of time, to avoid blocking
+         * users of AdminClient who will also take the lock to add new calls.
+         *
+         * @param now           The current time in milliseconds.
+         * @param callsToSend   A map of nodes to the calls they need to handle.
+         *
+         * @return              The new calls we need to process.
+         */
+        private void chooseNodesForNewCalls(long now, Map<Node, List<Call>> callsToSend) {
+            List<Call> newCallsToAdd = null;
+            synchronized (this) {
+                if (newCalls.isEmpty()) {
+                    return;
+                }
+                newCallsToAdd = newCalls;
+                newCalls = new LinkedList<>();
+            }
+            for (Call call : newCallsToAdd) {
+                chooseNodeForNewCall(now, callsToSend, call);
+            }
+        }
+
+        /**
+         * Choose a node for a new call.
+         *
+         * @param now           The current time in milliseconds.
+         * @param callsToSend   A map of nodes to the calls they need to handle.
+         * @param call          The call.
+         */
+        private void chooseNodeForNewCall(long now, Map<Node, List<Call>> callsToSend, Call call) {
+            Node node = call.nodeProvider.provide();
+            if (node == null) {
+                call.fail(now, new BrokerNotAvailableException(
+                    String.format("Error choosing node for %s: no node found.", call.callName)));
+                return;
+            }
+            log.trace("{}: assigned {} to {}", clientId, call, node);
+            getOrCreateListValue(callsToSend, node).add(call);
+        }
+
+        /**
+         * Send the calls which are ready.
+         *
+         * @param now                   The current time in milliseconds.
+         * @param callsToSend           The calls to send, by node.
+         * @param correlationIdToCalls  A map of correlation IDs to calls.
+         * @param callsInFlight         A map of nodes to the calls they have in flight.
+         *
+         * @return                      The minimum timeout we need for poll().
+         */
+        private long sendEligibleCalls(long now, Map<Node, List<Call>> callsToSend,
+                         Map<Integer, Call> correlationIdToCalls, Map<String, List<Call>> callsInFlight) {
+            long pollTimeout = Long.MAX_VALUE;
+            for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator();
+                     iter.hasNext(); ) {
+                Map.Entry<Node, List<Call>> entry = iter.next();
+                List<Call> calls = entry.getValue();
+                if (calls.isEmpty()) {
+                    iter.remove();
+                    continue;
+                }
+                Node node = entry.getKey();
+                if (!client.ready(node, now)) {
+                    long nodeTimeout = client.connectionDelay(node, now);
+                    pollTimeout = Math.min(pollTimeout, nodeTimeout);
+                    log.trace("{}: client is not ready to send to {}.  Must delay {} ms", clientId, node, nodeTimeout);
+                    continue;
+                }
+                Call call = calls.remove(0);
+                int timeoutMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
+                AbstractRequest.Builder<?> requestBuilder = null;
+                try {
+                    requestBuilder = call.createRequest(timeoutMs);
+                } catch (Throwable throwable) {
+                    call.fail(now, new KafkaException(String.format(
+                        "Internal error sending %s to %s.", call.callName, node)));
+                    continue;
+                }
+                ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true);
+                log.trace("{}: sending {} to {}. correlationId={}", clientId, requestBuilder, node,
+                    clientRequest.correlationId());
+                client.send(clientRequest, now);
+                getOrCreateListValue(callsInFlight, node.idString()).add(call);
+                correlationIdToCalls.put(clientRequest.correlationId(), call);
+            }
+            return pollTimeout;
+        }
+
+        /**
+         * Time out expired calls that are in flight.
+         *
+         * Calls that are in flight may have been partially or completely sent over the wire.  They may
+         * even be in the process of being processed by the remote server.  At the moment, our only option
+         * to time them out is to close the entire connection.
+         *
+         * @param now                   The current time in milliseconds.
+         * @param callsInFlight         A map of nodes to the calls they have in flight.
+         */
+        private void timeoutCallsInFlight(long now, Map<String, List<Call>> callsInFlight) {
+            int numTimedOut = 0;
+            for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) {
+                List<Call> contexts = entry.getValue();
+                if (contexts.isEmpty())
+                    continue;
+                String nodeId = entry.getKey();
+                // We assume that the first element in the list is the earliest.  So it should be the
+                // only one we need to check the timeout for.
+                Call call = contexts.get(0);
+                if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
+                    log.debug("{}: Closing connection to {} to time out {}", clientId, nodeId, call);
+                    client.close(nodeId);
+                    numTimedOut++;
+                    // We don't remove anything from the callsInFlight data structure.  Because the connection
+                    // has been closed, the calls should be returned by the next client#poll(),
+                    // and handled at that point.
+                }
+            }
+            if (numTimedOut > 0)
+                log.debug("{}: timed out {} call(s) in flight.", clientId, numTimedOut);
+        }
+
+        /**
+         * Handle responses from the server.
+         *
+         * @param now                   The current time in milliseconds.
+         * @param responses             The latest responses from KafkaClient.
+         * @param correlationIdToCall   A map of correlation IDs to calls.
+         * @param callsInFlight         A map of nodes to the calls they have in flight.
+        **/
+        private void handleResponses(long now, List<ClientResponse> responses, Map<String, List<Call>> callsInFlight,
+                Map<Integer, Call> correlationIdToCall) {
+            for (ClientResponse response : responses) {
+                int correlationId = response.requestHeader().correlationId();
+
+                Call call = correlationIdToCall.get(correlationId);
+                if (call == null) {
+                    // If the server returns information about a correlation ID we didn't use yet,
+                    // an internal server error has occurred.  Close the connection and log an error message.
+                    log.error("Internal server error on {}: server returned information about unknown " +
+                        "correlation ID {}", response.destination(), correlationId);
+                    client.close(response.destination());
+                    continue;
+                }
+
+                // Stop tracking this call.
+                correlationIdToCall.remove(correlationId);
+                getOrCreateListValue(callsInFlight, response.requestHeader().clientId()).remove(call);
+
+                // Handle the result of the call.  This may involve retrying the call, if we got a
+                // retryible exception.
+                if (response.versionMismatch() != null) {
+                    call.fail(now, response.versionMismatch());
+                } else if (response.wasDisconnected()) {
+                    call.fail(now, new DisconnectException(String.format(
+                        "Cancelled %s request with correlation id %s due to node %s being disconnected",
+                        call.callName, correlationId, response.destination())));
+                } else {
+                    try {
+                        call.handleResponse(response.responseBody());
+                        if (log.isTraceEnabled())
+                            log.trace("{}: {} got response {}", clientId, call, response.responseBody());
+                    } catch (Throwable t) {
+                        if (log.isTraceEnabled())
+                            log.trace("{}: {} handleResponse failed with {}", clientId, call, prettyPrintException(t));
+                        call.fail(now, t);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            /**
+             * Maps nodes to calls that we want to send.
+             */
+            Map<Node, List<Call>> callsToSend = new HashMap<>();
+
+            /**
+             * Maps node ID strings to calls that have been sent.
+             */
+            Map<String, List<Call>> callsInFlight = new HashMap<>();
+
+            /**
+             * Maps correlation IDs to calls that have been sent.
+             */
+            Map<Integer, Call> correlationIdToCalls = new HashMap<>();
+
+            /**
+             * The previous metadata version which wasn't usable, or null if there is none.
+             */
+            Integer prevMetadataVersion = null;
+
+            long now = time.milliseconds();
+            log.trace("{} thread starting", clientId);
+            while (true) {
+                // Check if the AdminClient is shutting down.
+                if (closed)
+                    break;
+
+                // Handle timeouts.
+                timeoutNewCalls(now);
+                timeoutCallsToSend(now, callsToSend);
+                timeoutCallsInFlight(now, callsInFlight);
+
+                // Handle new calls and metadata update requests.
+                prevMetadataVersion = checkMetadataReady(prevMetadataVersion);
+                long pollTimeout = 1200000;
+                if (prevMetadataVersion == null) {
+                    chooseNodesForNewCalls(now, callsToSend);
+                    pollTimeout = Math.min(pollTimeout,
+                        sendEligibleCalls(now, callsToSend, correlationIdToCalls, callsInFlight));
+                }
+
+                // Wait for network responses.
+                log.trace("{}: entering KafkaClient#poll(timeout={})", clientId, pollTimeout);
+                List<ClientResponse> responses = client.poll(pollTimeout, now);
+                log.trace("{}: KafkaClient#poll retrieved {} response(s)", clientId, responses.size());
+
+                // Update the current time and handle the latest responses.
+                now = time.milliseconds();
+                handleResponses(now, responses, callsInFlight, correlationIdToCalls);
+            }
+            int numTimedOut = 0;
+            synchronized (this) {
+                numTimedOut += timeoutCalls(Long.MAX_VALUE, newCalls);
+            }
+            numTimedOut += timeoutCalls(Long.MAX_VALUE, correlationIdToCalls.values());
+            if (numTimedOut > 0) {
+                log.debug("{}: timed out {} remaining operations.", clientId, numTimedOut);
+            }
+            closeQuietly(client, "KafkaClient");
+            closeQuietly(metrics, "Metrics");
+            log.debug("{}: exiting AdminClientRunnable thread.", clientId);
+        }
+
+        void call(Call call, long now) {
+            if (log.isDebugEnabled()) {
+                log.debug("{}: queueing {} with a timeout {} ms from now.",
+                    clientId, call, call.deadlineMs - now);
+            }
+            synchronized (this) {
+                newCalls.add(call);
+            }
+            client.wakeup();
+        }
+    }
+
+    @Override
+    public CreateTopicResults createTopics(final Collection<NewTopic> newTopics,
+                                           final CreateTopicsOptions options) {
+        final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
+        for (NewTopic newTopic : newTopics) {
+            topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
+        }
+        final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
+        for (NewTopic newTopic : newTopics) {
+            topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
+        }
+        final long now = time.milliseconds();
+        runnable.call(new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            public AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.validateOnly());
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse;
+                // Handle server responses for particular topics.
+                for (Map.Entry<String, CreateTopicsResponse.Error> entry : response.errors().entrySet()) {
+                    KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
+                    if (future == null) {
+                        log.warn("Server response mentioned unknown topic {}", entry.getKey());
+                    } else {
+                        ApiException exception = entry.getValue().exception();
+                        if (exception != null) {
+                            future.completeExceptionally(exception);
+                        } else {
+                            future.complete(null);
+                        }
+                    }
+                }
+                // The server should send back a response for every topic.  But do a sanity check anyway.
+                for (Map.Entry<String, KafkaFutureImpl<Void>> entry : topicFutures.entrySet()) {
+                    KafkaFutureImpl<Void> future = entry.getValue();
+                    if (!future.isDone()) {
+                        future.completeExceptionally(new ApiException("The server response did not " +
+                            "contain a reference to node " + entry.getKey()));
+                    }
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(topicFutures.values(), throwable);
+            }
+        }, now);
+        return new CreateTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+    }
+
+    @Override
+    public DeleteTopicResults deleteTopics(final Collection<String> topicNames,
+                                           DeleteTopicsOptions options) {
+        final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
+        for (String topicName : topicNames) {
+            topicFutures.put(topicName, new KafkaFutureImpl<Void>());
+        }
+        final long now = time.milliseconds();
+        runnable.call(new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DeleteTopicsRequest.Builder(new HashSet<>(topicNames), timeoutMs);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse;
+                // Handle server responses for particular topics.
+                for (Map.Entry<String, Errors> entry : response.errors().entrySet()) {
+                    KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
+                    if (future == null) {
+                        log.warn("Server response mentioned unknown topic {}", entry.getKey());
+                    } else {
+                        ApiException exception = entry.getValue().exception();
+                        if (exception != null) {
+                            future.completeExceptionally(exception);
+                        } else {
+                            future.complete(null);
+                        }
+                    }
+                }
+                // The server should send back a response for every topic.  But do a sanity check anyway.
+                for (Map.Entry<String, KafkaFutureImpl<Void>> entry : topicFutures.entrySet()) {
+                    KafkaFutureImpl<Void> future = entry.getValue();
+                    if (!future.isDone()) {
+                        future.completeExceptionally(new ApiException("The server response did not " +
+                            "contain a reference to node " + entry.getKey()));
+                    }
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(topicFutures.values(), throwable);
+            }
+        }, now);
+        return new DeleteTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+    }
+
+    @Override
+    public ListTopicsResults listTopics(final ListTopicsOptions options) {
+        final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return MetadataRequest.Builder.allTopics();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) abstractResponse;
+                Cluster cluster = response.cluster();
+                Map<String, TopicListing> topicListing = new HashMap<>();
+                for (String topicName : cluster.topics()) {
+                    boolean internal = cluster.internalTopics().contains(topicName);
+                    if (!internal || options.listInternal())
+                        topicListing.put(topicName, new TopicListing(topicName, internal));
+                }
+                topicListingFuture.complete(topicListing);
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                topicListingFuture.completeExceptionally(throwable);
+            }
+        }, now);
+        return new ListTopicsResults(topicListingFuture);
+    }
+
+    @Override
+    public DescribeTopicsResults describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) {
+        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
+        for (String topicName : topicNames) {
+            topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
+        }
+        final long now = time.milliseconds();
+        runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DeleteTopicsRequest.Builder(new HashSet<>(topicNames), timeoutMs);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) abstractResponse;
+                // Handle server responses for particular topics.
+                for (Map.Entry<String, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) {
+                    String topicName = entry.getKey();
+                    KafkaFutureImpl<TopicDescription> future = entry.getValue();
+                    Errors topicError = response.errors().get(topicName);
+                    if (topicError != null) {
+                        future.completeExceptionally(topicError.exception());
+                        continue;
+                    }
+                    Cluster cluster = response.cluster();
+                    if (!cluster.topics().contains(topicName)) {
+                        future.completeExceptionally(new InvalidTopicException("Topic " + topicName + " not found."));
+                        continue;
+                    }
+                    boolean isInternal = cluster.internalTopics().contains(topicName);
+                    TreeMap<Integer, TopicPartitionInfo> partitions = new TreeMap<>();
+                    List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName);
+                    for (PartitionInfo partitionInfo : partitionInfos) {
+                        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
+                            partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()),
+                            Arrays.asList(partitionInfo.inSyncReplicas()));
+                        partitions.put(partitionInfo.partition(), topicPartitionInfo);
+                    }
+                    TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions);
+                    future.complete(topicDescription);
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(topicFutures.values(), throwable);
+            }
+        }, now);
+        return new DescribeTopicsResults(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
+    }
+
+    @Override
+    public DescribeClusterResults describeCluster(DescribeClusterOptions options) {
+        final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(Collections.<String>emptyList());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) abstractResponse;
+                describeClusterFuture.complete(response.brokers());
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                describeClusterFuture.completeExceptionally(throwable);
+            }
+        }, now);
+        return new DescribeClusterResults(describeClusterFuture);
+    }
+
+    @Override
+    public ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options) {
+        final long now = time.milliseconds();
+        final long deadlineMs = calcDeadlineMs(now, options.timeoutMs());
+        Map<Node, KafkaFuture<NodeApiVersions>> nodeFutures = new HashMap<>();
+        for (final Node node : nodes) {
+            final KafkaFutureImpl<NodeApiVersions> nodeFuture = new KafkaFutureImpl<>();
+            nodeFutures.put(node, nodeFuture);
+            runnable.call(new Call("apiVersions", deadlineMs, new ConstantAdminNodeProvider(node)) {
+                    @Override
+                    public AbstractRequest.Builder createRequest(int timeoutMs) {
+                        return new ApiVersionsRequest.Builder();
+                    }
+
+                    @Override
+                    public void handleResponse(AbstractResponse abstractResponse) {
+                        ApiVersionsResponse response = (ApiVersionsResponse) abstractResponse;
+                        nodeFuture.complete(new NodeApiVersions(response.apiVersions()));
+                    }
+
+                    @Override
+                    public void handleFailure(Throwable throwable) {
+                        nodeFuture.completeExceptionally(throwable);
+                    }
+                }, now);
+        }
+        return new ApiVersionsResults(nodeFutures);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
new file mode 100644
index 0000000..02a0a40
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for listTopics.
+ */
+@InterfaceStability.Unstable
+public class ListTopicsOptions {
+    private Integer timeoutMs = null;
+    private boolean listInternal = false;
+
+    public ListTopicsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+
+    /**
+     * Set whether we should list internal topics.
+     *
+     * @param listInternal  Whether we should list internal topics.  null means to use
+     *                      the default.
+     * @return              This ListTopicsOptions object.
+     */
+    public ListTopicsOptions listInternal(boolean listInternal) {
+        this.listInternal = listInternal;
+        return this;
+    }
+
+    public boolean listInternal() {
+        return listInternal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
new file mode 100644
index 0000000..7e9448d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The result of the listTopics call.
+ */
+@InterfaceStability.Unstable
+public class ListTopicsResults {
+    final KafkaFuture<Map<String, TopicListing>> future;
+
+    ListTopicsResults(KafkaFuture<Map<String, TopicListing>> future) {
+        this.future = future;
+    }
+
+    /**
+     * Return a future which yields a map of topic names to TopicListing objects.
+     */
+    public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() {
+        return future;
+    }
+
+    /**
+     * Return a future which yields a collection of TopicListing objects.
+     */
+    public KafkaFuture<Collection<TopicListing>> descriptions() {
+        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
+            @Override
+            public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
+                return namesToDescriptions.values();
+            }
+        });
+    }
+
+    /**
+     * Return a future which yields a collection of topic names.
+     */
+    public KafkaFuture<Collection<String>> names() {
+        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() {
+            @Override
+            public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) {
+                return namesToDescriptions.keySet();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
new file mode 100644
index 0000000..a1f6fb5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A request to create a new topic through the AdminClient API. 
+ */
+public class NewTopic {
+    private final String name;
+    private final int numPartitions;
+    private final short replicationFactor;
+    private final Map<Integer, List<Integer>> replicasAssignments;
+    private Map<String, String> configs = null;
+
+    /**
+     * Create a new topic with a fixed replication factor and number of partitions.
+     */
+    public NewTopic(String name, int numPartitions, short replicationFactor) {
+        this.name = name;
+        this.numPartitions = numPartitions;
+        this.replicationFactor = replicationFactor;
+        this.replicasAssignments = null;
+    }
+
+    /**
+     * A request to create a new topic with a specific replica assignment configuration.
+     */
+    public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments) {
+        this.name = name;
+        this.numPartitions = -1;
+        this.replicationFactor = -1;
+        this.replicasAssignments = replicasAssignments;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Set the configuration to use on the new topic.
+     *
+     * @param configs               The configuration map.
+     * @return                      This NewTopic object.
+     */
+    public NewTopic configs(Map<String, String> configs) {
+        this.configs = configs;
+        return this;
+    }
+
+    TopicDetails convertToTopicDetails() {
+        if (replicasAssignments != null) {
+            if (configs != null) {
+                return new TopicDetails(replicasAssignments, configs);
+            } else {
+                return new TopicDetails(replicasAssignments);
+            }
+        } else {
+            if (configs != null) {
+                return new TopicDetails(numPartitions, replicationFactor, configs);
+            } else {
+                return new TopicDetails(numPartitions, replicationFactor);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
new file mode 100644
index 0000000..2fc4442
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.NavigableMap;
+
+/**
+ * A detailed description of a single topic in the cluster.
+ */
+public class TopicDescription {
+    private final String name;
+    private final boolean internal;
+    private final NavigableMap<Integer, TopicPartitionInfo> partitions;
+
+    TopicDescription(String name, boolean internal,
+                    NavigableMap<Integer, TopicPartitionInfo> partitions) {
+        this.name = name;
+        this.internal = internal;
+        this.partitions = partitions;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public boolean internal() {
+        return internal;
+    }
+
+    public NavigableMap<Integer, TopicPartitionInfo> partitions() {
+        return partitions;
+    }
+
+    @Override
+    public String toString() {
+        return "(name=" + name + ", internal=" + internal + ", partitions=" +
+            Utils.mkString(partitions, "[", "]", "=", ",") + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
new file mode 100644
index 0000000..4c25551
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * A listing of a topic in the cluster.
+ */
+public class TopicListing {
+    private final String name;
+    private final boolean internal;
+
+    TopicListing(String name, boolean internal) {
+        this.name = name;
+        this.internal = internal;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public boolean internal() {
+        return internal;
+    }
+
+    @Override
+    public String toString() {
+        return "(name=" + name + ", internal=" + internal + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
new file mode 100644
index 0000000..b304802
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.List;
+
+public class TopicPartitionInfo {
+    private final int partition;
+    private final Node leader;
+    private final List<Node> replicas;
+    private final List<Node> isr;
+
+    TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
+        this.partition = partition;
+        this.leader = leader;
+        this.replicas = replicas;
+        this.isr = isr;
+    }
+
+    public int partition() {
+        return partition;
+    }
+
+    public Node leader() {
+        return leader;
+    }
+
+    public List<Node> replicas() {
+        return replicas;
+    }
+
+    public List<Node> isr() {
+        return isr;
+    }
+
+    public String toString() {
+        return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
+            Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index ba1d2af..6619b4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -37,6 +37,7 @@ public final class Cluster {
     private final List<Node> nodes;
     private final Set<String> unauthorizedTopics;
     private final Set<String> internalTopics;
+    private final Node controller;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
     private final Map<String, List<PartitionInfo>> partitionsByTopic;
     private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
@@ -54,7 +55,7 @@ public final class Cluster {
     public Cluster(Collection<Node> nodes,
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics) {
-        this(null, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet());
+        this(null, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), null);
     }
 
 
@@ -68,7 +69,21 @@ public final class Cluster {
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, null);
+    }
+
+    /**
+     * Create a new cluster with the given id, nodes and partitions
+     * @param nodes The nodes in the cluster
+     * @param partitions Information about a subset of the topic-partitions this cluster hosts
+     */
+    public Cluster(String clusterId,
+                   Collection<Node> nodes,
+                   Collection<PartitionInfo> partitions,
+                   Set<String> unauthorizedTopics,
+                   Set<String> internalTopics,
+                   Node controller) {
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, controller);
     }
 
     private Cluster(String clusterId,
@@ -76,7 +91,8 @@ public final class Cluster {
                     Collection<Node> nodes,
                     Collection<PartitionInfo> partitions,
                     Set<String> unauthorizedTopics,
-                    Set<String> internalTopics) {
+                    Set<String> internalTopics,
+                    Node controller) {
         this.isBootstrapConfigured = isBootstrapConfigured;
         this.clusterResource = new ClusterResource(clusterId);
         // make a randomized, unmodifiable copy of the nodes
@@ -130,6 +146,7 @@ public final class Cluster {
 
         this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
         this.internalTopics = Collections.unmodifiableSet(internalTopics);
+        this.controller = controller;
     }
 
     /**
@@ -137,7 +154,7 @@ public final class Cluster {
      */
     public static Cluster empty() {
         return new Cluster(null, new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(),
-                Collections.<String>emptySet());
+                Collections.<String>emptySet(), null);
     }
 
     /**
@@ -150,7 +167,7 @@ public final class Cluster {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
-        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());
+        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
     }
 
     /**
@@ -160,7 +177,7 @@ public final class Cluster {
         Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
         combinedPartitions.putAll(partitions);
         return new Cluster(clusterResource.clusterId(), this.nodes, combinedPartitions.values(),
-                new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics));
+                new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics), this.controller);
     }
 
     /**
@@ -265,6 +282,10 @@ public final class Cluster {
         return clusterResource;
     }
 
+    public Node controller() {
+        return controller;
+    }
+
     @Override
     public String toString() {
         return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
new file mode 100644
index 0000000..3c51fbe
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A flexible future which supports call chaining and other asynchronous programming patterns.
+ */
+public abstract class KafkaFuture<T> implements Future<T> {
+    /**
+     * A function which takes objects of type A and returns objects of type B.
+     */
+    public static abstract class Function<A, B> {
+        public abstract B apply(A a);
+    }
+
+    /**
+     * A consumer of two different types of object.
+     */
+    public static abstract class BiConsumer<A, B> {
+        public abstract void accept(A a, B b);
+    }
+
+    private static class AllOfAdapter<R> extends BiConsumer<R, Throwable> {
+        private int remainingResponses;
+        private KafkaFuture future;
+
+        public AllOfAdapter(int remainingResponses, KafkaFuture future) {
+            this.remainingResponses = remainingResponses;
+            this.future = future;
+        }
+
+        @Override
+        public synchronized void accept(R newValue, Throwable exception) {
+            if (remainingResponses <= 0)
+                return;
+            if (exception != null) {
+                remainingResponses = 0;
+                future.completeExceptionally(exception);
+            } else {
+                remainingResponses--;
+                if (remainingResponses <= 0)
+                    future.complete(null);
+            }
+        }
+    }
+
+    /** 
+     * Returns a new KafkaFuture that is already completed with the given value.
+     */
+    public static <U> KafkaFuture<U> completedFuture(U value) {
+        KafkaFuture<U> future = new KafkaFutureImpl<U>();
+        future.complete(value);
+        return future;
+    }
+
+    /** 
+     * Returns a new KafkaFuture that is completed when all the given futures have completed.  If
+     * any future throws an exception, the returned future returns it.  If multiple futures throw
+     * an exception, which one gets returned is arbitrarily chosen.
+     */
+    public static KafkaFuture<Void> allOf(KafkaFuture<?>... futures) {
+        KafkaFuture<Void> allOfFuture = new KafkaFutureImpl<Void>();
+        AllOfAdapter allOfWaiter = new AllOfAdapter(futures.length, allOfFuture);
+        for (KafkaFuture<?> future : futures) {
+            future.addWaiter(allOfWaiter);
+        }
+        return allOfFuture;
+    }
+
+    /**
+     * Returns a new KafkaFuture that, when this future completes normally, is executed with this
+     * futures's result as the argument to the supplied function.
+     */
+    public abstract <R> KafkaFuture<R> thenApply(Function<T, R> function);
+
+    protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> action);
+
+    /**
+     * If not already completed, sets the value returned by get() and related methods to the given
+     * value.
+     */
+    protected abstract boolean complete(T newValue);
+
+    /**
+     * If not already completed, causes invocations of get() and related methods to throw the given
+     * exception.
+     */
+    protected abstract boolean completeExceptionally(Throwable newException);
+
+    /**
+     * If not already completed, completes this future with a CancellationException.  Dependent
+     * futures that have not already completed will also complete exceptionally, with a
+     * CompletionException caused by this CancellationException.
+     */
+    @Override
+    public abstract boolean cancel(boolean mayInterruptIfRunning);
+
+    /**
+     * Waits if necessary for this future to complete, and then returns its result.
+     */
+    @Override
+    public abstract T get() throws InterruptedException, ExecutionException;
+
+    /**
+     * Waits if necessary for at most the given time for this future to complete, and then returns
+     * its result, if available.
+     */
+    @Override
+    public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+        TimeoutException;
+
+    /**
+     * Returns the result value (or throws any encountered exception) if completed, else returns
+     * the given valueIfAbsent.
+     */
+    public abstract T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException;
+
+    /**
+     * Returns true if this CompletableFuture was cancelled before it completed normally.
+     */
+    @Override
+    public abstract boolean isCancelled();
+
+    /**
+     * Returns true if this CompletableFuture completed exceptionally, in any way.
+     */
+    public abstract boolean isCompletedExceptionally();
+
+    /**
+     * Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
+     */
+    @Override
+    public abstract boolean isDone();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
new file mode 100644
index 0000000..01355c6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * A flexible future which supports call chaining and other asynchronous programming patterns.
+ * This will eventually become a thin shim on top of Java 8's CompletableFuture.
+ */
+public class KafkaFutureImpl<T> extends KafkaFuture<T> {
+    /**
+     * A convenience method that throws the current exception, wrapping it if needed.
+     *
+     * In general, KafkaFuture throws CancellationException and InterruptedException directly, and
+     * wraps all other exceptions in an ExecutionException.
+     */
+    private static void wrapAndThrow(Throwable t) throws InterruptedException, ExecutionException {
+        if (t instanceof CancellationException) {
+            throw (CancellationException) t;
+        } else if (t instanceof InterruptedException) {
+            throw (InterruptedException) t;
+        } else {
+            throw new ExecutionException(t);
+        }
+    }
+
+    private static class Applicant<A, B> extends BiConsumer<A, Throwable> {
+        private final Function<A, B> function;
+        private final KafkaFutureImpl<B> future;
+
+        Applicant(Function<A, B> function, KafkaFutureImpl<B> future) {
+            this.function = function;
+            this.future = future;
+        }
+
+        @Override
+        public void accept(A a, Throwable exception) {
+            if (exception != null) {
+                future.completeExceptionally(exception);
+            } else {
+                try {
+                    B b = function.apply(a);
+                    future.complete(b);
+                } catch (Throwable t) {
+                    future.completeExceptionally(t);
+                }
+            }
+        }
+    }
+
+    private static class SingleWaiter<R> extends BiConsumer<R, Throwable> {
+        private R value = null;
+        private Throwable exception = null;
+        private boolean done = false;
+
+        @Override
+        public synchronized void accept(R newValue, Throwable newException) {
+            this.value = newValue;
+            this.exception = newException;
+            this.done = true;
+            this.notifyAll();
+        }
+
+        synchronized R await() throws InterruptedException, ExecutionException {
+            while (true) {
+                if (exception != null)
+                    wrapAndThrow(exception);
+                if (done)
+                    return value;
+                this.wait();
+            }
+        }
+
+        R await(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, TimeoutException {
+            long startMs = System.currentTimeMillis();
+            long waitTimeMs = (unit.toMillis(timeout) > 0) ? unit.toMillis(timeout) : 1;
+            long delta = 0;
+            synchronized (this) {
+                while (true) {
+                    if (exception != null)
+                        wrapAndThrow(exception);
+                    if (done)
+                        return value;
+                    if (delta > waitTimeMs) {
+                        throw new TimeoutException();
+                    }
+                    this.wait(waitTimeMs - delta);
+                    delta = System.currentTimeMillis() - startMs;
+                }
+            }
+        }
+    }
+
+    /**
+     * True if this future is done.
+     */
+    private boolean done = false;
+
+    /**
+     * The value of this future, or null.  Protected by the object monitor.
+     */
+    private T value = null;
+
+    /**
+     * The exception associated with this future, or null.  Protected by the object monitor.
+     */
+    private Throwable exception = null;
+
+    /**
+     * A list of objects waiting for this future to complete (either successfully or
+     * exceptionally).  Protected by the object monitor.
+     */
+    private List<BiConsumer<? super T, ? super Throwable>> waiters = new ArrayList<>();
+
+    /**
+     * Returns a new KafkaFuture that, when this future completes normally, is executed with this
+     * futures's result as the argument to the supplied function.
+     */
+    @Override
+    public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
+        KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
+        addWaiter(new Applicant(function, future));
+        return future;
+    }
+
+    @Override
+    protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
+        if (exception != null) {
+            action.accept(null, exception);
+        } else if (done) {
+            action.accept(value, null);
+        } else {
+            waiters.add(action);
+        }
+    }
+
+    @Override
+    public synchronized boolean complete(T newValue) {
+        List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
+        synchronized (this) {
+            if (done)
+                return false;
+            value = newValue;
+            done = true;
+            oldWaiters = waiters;
+            waiters = null;
+        }
+        for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
+            waiter.accept(newValue, null);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean completeExceptionally(Throwable newException) {
+        List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
+        synchronized (this) {
+            if (done)
+                return false;
+            exception = newException;
+            done = true;
+            oldWaiters = waiters;
+            waiters = null;
+        }
+        for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
+            waiter.accept(null, newException);
+        }
+        return true;
+    }
+
+    /**
+     * If not already completed, completes this future with a CancellationException.  Dependent
+     * futures that have not already completed will also complete exceptionally, with a
+     * CompletionException caused by this CancellationException.
+     */
+    @Override
+    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+        if (completeExceptionally(new CancellationException()))
+            return true;
+        return exception instanceof CancellationException;
+    }
+
+    /**
+     * Waits if necessary for this future to complete, and then returns its result.
+     */
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        SingleWaiter<T> waiter = new SingleWaiter<T>();
+        addWaiter(waiter);
+        return waiter.await();
+    }
+
+    /**
+     * Waits if necessary for at most the given time for this future to complete, and then returns
+     * its result, if available.
+     */
+    @Override
+    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+            TimeoutException {
+        SingleWaiter<T> waiter = new SingleWaiter<T>();
+        addWaiter(waiter);
+        return waiter.await(timeout, unit);
+    }
+
+    /**
+     * Returns the result value (or throws any encountered exception) if completed, else returns
+     * the given valueIfAbsent.
+     */
+    @Override
+    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException {
+        if (exception != null)
+            wrapAndThrow(exception);
+        if (done)
+            return value;
+        return valueIfAbsent;
+    }
+
+    /**
+     * Returns true if this CompletableFuture was cancelled before it completed normally.
+     */
+    @Override
+    public synchronized boolean isCancelled() {
+        return (exception != null) && (exception instanceof CancellationException);
+    }
+
+    /**
+     * Returns true if this CompletableFuture completed exceptionally, in any way.
+     */
+    @Override
+    public synchronized boolean isCompletedExceptionally() {
+        return exception != null;
+    }
+
+    /**
+     * Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
+     */
+    @Override
+    public synchronized boolean isDone() {
+        return done;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
index 303d76f..0e5ca78 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.KafkaException;
 /**
  * A ChannelBuilder interface to build Channel based on configs
  */
-public interface ChannelBuilder {
+public interface ChannelBuilder extends AutoCloseable {
 
     /**
      * Configure this class with the given key-value pairs

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8dd3ad6..312e1f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -79,7 +79,7 @@ import org.slf4j.LoggerFactory;
  *
  * This class is not thread safe!
  */
-public class Selector implements Selectable {
+public class Selector implements Selectable, AutoCloseable {
 
     public static final long NO_IDLE_TIMEOUT_MS = -1;
     private static final Logger log = LoggerFactory.getLogger(Selector.class);


[3/3] kafka git commit: KAFKA-3265; Add a public AdminClient API in Java (KIP-117)

Posted by ij...@apache.org.
KAFKA-3265; Add a public AdminClient API in Java (KIP-117)

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Dan Norwood <no...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #2472 from cmccabe/KAFKA-3265


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4aed28d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4aed28d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4aed28d1

Branch: refs/heads/trunk
Commit: 4aed28d1897c6c5293f372cb4fc44ab363dfc365
Parents: c96656e
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Tue May 2 00:16:01 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 2 00:20:22 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |    5 +
 checkstyle/suppressions.xml                     |    4 +-
 .../java/org/apache/kafka/clients/Metadata.java |    4 +-
 .../apache/kafka/clients/admin/AdminClient.java |  186 +++
 .../kafka/clients/admin/AdminClientConfig.java  |  163 +++
 .../kafka/clients/admin/ApiVersionsOptions.java |   37 +
 .../kafka/clients/admin/ApiVersionsResults.java |   63 ++
 .../kafka/clients/admin/CreateTopicResults.java |   49 +
 .../clients/admin/CreateTopicsOptions.java      |   47 +
 .../kafka/clients/admin/DeleteTopicResults.java |   50 +
 .../clients/admin/DeleteTopicsOptions.java      |   37 +
 .../clients/admin/DescribeClusterOptions.java   |   37 +
 .../clients/admin/DescribeClusterResults.java   |   43 +
 .../clients/admin/DescribeTopicsOptions.java    |   37 +
 .../clients/admin/DescribeTopicsResults.java    |   68 ++
 .../kafka/clients/admin/KafkaAdminClient.java   | 1065 ++++++++++++++++++
 .../kafka/clients/admin/ListTopicsOptions.java  |   54 +
 .../kafka/clients/admin/ListTopicsResults.java  |   67 ++
 .../apache/kafka/clients/admin/NewTopic.java    |   85 ++
 .../kafka/clients/admin/TopicDescription.java   |   56 +
 .../kafka/clients/admin/TopicListing.java       |   44 +
 .../kafka/clients/admin/TopicPartitionInfo.java |   58 +
 .../java/org/apache/kafka/common/Cluster.java   |   33 +-
 .../org/apache/kafka/common/KafkaFuture.java    |  155 +++
 .../kafka/common/internals/KafkaFutureImpl.java |  264 +++++
 .../kafka/common/network/ChannelBuilder.java    |    2 +-
 .../apache/kafka/common/network/Selector.java   |    2 +-
 .../apache/kafka/common/protocol/Errors.java    |  507 +++++++--
 .../common/requests/CreateTopicsResponse.java   |    5 +
 .../kafka/common/requests/MetadataResponse.java |    3 +-
 .../org/apache/kafka/common/utils/Utils.java    |    2 +-
 .../clients/admin/KafkaAdminClientTest.java     |  206 ++++
 .../apache/kafka/common/KafkaFutureTest.java    |  164 +++
 .../main/scala/kafka/admin/AdminClient.scala    |    5 +
 .../integration/kafka/api/AdminClientTest.scala |  263 -----
 .../api/KafkaAdminClientIntegrationTest.scala   |  162 +++
 .../kafka/api/LegacyAdminClientTest.scala       |  266 +++++
 .../api/SaslSslAdminClientIntegrationTest.scala |   26 +
 .../test/scala/unit/kafka/utils/TestUtils.scala |    3 +
 39 files changed, 3943 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index d7851a5..d40c4d4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -47,6 +47,7 @@
   <subpackage name="common">
     <disallow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.common" exact-match="true" />
+    <allow pkg="org.apache.kafka.common.internals" exact-match="true" />
     <allow pkg="org.apache.kafka.test" />
 
     <subpackage name="config">
@@ -134,6 +135,10 @@
       <allow pkg="org.apache.kafka.clients.consumer" />
       <allow pkg="org.apache.kafka.clients.producer" />
     </subpackage>
+
+    <subpackage name="admin">
+      <allow pkg="org.apache.kafka.clients.admin" />
+    </subpackage>
   </subpackage>
 
   <subpackage name="server">

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index eae8dde..dd41f94 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest).java"/>
+              files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient).java"/>
     <suppress checks="ClassFanOutComplexity"
               files=".*/protocol/Errors.java"/>
     <suppress checks="ClassFanOutComplexity"
@@ -35,7 +35,7 @@
               files="DefaultRecordBatch.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager).java"/>
+              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
               files=".*/protocol/Errors.java"/>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 5bfdb64..9ff629d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -332,6 +332,7 @@ public final class Metadata {
         Collection<PartitionInfo> partitionInfos = new ArrayList<>();
         List<Node> nodes = Collections.emptyList();
         Set<String> internalTopics = Collections.emptySet();
+        Node controller = null;
         String clusterId = null;
         if (cluster != null) {
             clusterId = cluster.clusterResource().clusterId();
@@ -346,7 +347,8 @@ public final class Metadata {
                 }
             }
             nodes = cluster.nodes();
+            controller  = cluster.controller();
         }
-        return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics);
+        return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, internalTopics, controller);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
new file mode 100644
index 0000000..a97219b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The public interface for the {@link KafkaAdminClient}, which supports managing and inspecting topics,
+ * brokers, and configurations.
+ *
+ * @see KafkaAdminClient
+ */
+@InterfaceStability.Unstable
+public abstract class AdminClient implements AutoCloseable {
+    /**
+     * Create a new AdminClient with the given configuration.
+     *
+     * @param conf          The configuration.
+     * @return              The new KafkaAdminClient.
+     */
+    public static AdminClient create(Map<String, Object> conf) {
+        return KafkaAdminClient.create(new AdminClientConfig(conf));
+    }
+
+    /**
+     * Close the AdminClient and release all associated resources.
+     */
+    public abstract void close();
+
+    /**
+     * Create a batch of new topics with the default options.
+     *
+     * @param newTopics         The new topics to create.
+     * @return                  The CreateTopicsResults.
+     */
+    public CreateTopicResults createTopics(Collection<NewTopic> newTopics) {
+        return createTopics(newTopics, new CreateTopicsOptions());
+    }
+
+    /**
+     * Create a batch of new topics.
+     *
+     * It may take several seconds after AdminClient#createTopics returns
+     * success for all the brokers to become aware that the topics have been created.
+     * During this time, AdminClient#listTopics and AdminClient#describeTopics
+     * may not return information about the new topics.
+     *
+     * @param newTopics         The new topics to create.
+     * @param options           The options to use when creating the new topics.
+     * @return                  The CreateTopicsResults.
+     */
+    public abstract CreateTopicResults createTopics(Collection<NewTopic> newTopics,
+                                                    CreateTopicsOptions options);
+
+    /**
+     * Similar to #{@link AdminClient#deleteTopics(Collection<String>, DeleteTopicsOptions),
+     * but uses the default options.
+     *
+     * @param topics            The topic names to delete.
+     * @return                  The DeleteTopicsResults.
+     */
+    public DeleteTopicResults deleteTopics(Collection<String> topics) {
+        return deleteTopics(topics, new DeleteTopicsOptions());
+    }
+
+    /**
+     * Delete a batch of topics.
+     *
+     * It may take several seconds after AdminClient#deleteTopics returns
+     * success for all the brokers to become aware that the topics are gone.
+     * During this time, AdminClient#listTopics and AdminClient#describeTopics
+     * may continue to return information about the deleted topics.
+     *
+     * If delete.topic.enable is false on the brokers, deleteTopics will mark
+     * the topics for deletion, but not actually delete them.  The futures will
+     * return successfully in this case.
+     *
+     * @param topics            The topic names to delete.
+     * @param options           The options to use when deleting the topics.
+     * @return                  The DeleteTopicsResults.
+     */
+    public abstract DeleteTopicResults deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
+
+    /**
+     * List the topics available in the cluster with the default options.
+     *
+     * @return                  The ListTopicsResults.
+     */
+    public ListTopicsResults listTopics() {
+        return listTopics(new ListTopicsOptions());
+    }
+
+    /**
+     * List the topics available in the cluster.
+     *
+     * @param options           The options to use when listing the topics.
+     * @return                  The ListTopicsResults.
+     */
+    public abstract ListTopicsResults listTopics(ListTopicsOptions options);
+
+    /**
+     * Descripe an individual topic in the cluster, with the default options.
+     *
+     * See {@link AdminClient#describeTopics(Collection<String>, DescribeTopicsOptions)}
+     *
+     * @param topicNames        The names of the topics to describe.
+     *
+     * @return                  The DescribeTopicsResults.
+     */
+    public DescribeTopicsResults describeTopics(Collection<String> topicNames) {
+        return describeTopics(topicNames, new DescribeTopicsOptions());
+    }
+
+    /**
+     * Descripe an individual topic in the cluster.
+     *
+     * Note that if auto.create.topics.enable is true on the brokers,
+     * AdminClient#describeTopic(topicName) may create a topic named topicName.
+     * There are two workarounds: either use AdminClient#listTopics and ensure
+     * that the topic is present before describing, or disable
+     * auto.create.topics.enable.
+     *
+     * @param topicNames        The names of the topics to describe.
+     * @param options           The options to use when describing the topic.
+     *
+     * @return                  The DescribeTopicsResults.
+     */
+    public abstract DescribeTopicsResults describeTopics(Collection<String> topicNames,
+                                                         DescribeTopicsOptions options);
+
+    /**
+     * Get information about the nodes in the cluster, using the default options.
+     *
+     * @return                  The DescribeClusterResults.
+     */
+    public DescribeClusterResults describeCluster() {
+        return describeCluster(new DescribeClusterOptions());
+    }
+
+    /**
+     * Get information about the nodes in the cluster.
+     *
+     * @param options           The options to use when getting information about the cluster.
+     * @return                  The DescribeClusterResults.
+     */
+    public abstract DescribeClusterResults describeCluster(DescribeClusterOptions options);
+
+    /**
+     * Get information about the api versions of nodes in the cluster with the default options.
+     * See {@link AdminClient#apiVersions(Collection<Node>, ApiVersionsOptions)}
+     *
+     * @param nodes             The nodes to get information about, or null to get information about all nodes.
+     * @return                  The ApiVersionsResults.
+     */
+    public ApiVersionsResults apiVersions(Collection<Node> nodes) {
+        return apiVersions(nodes, new ApiVersionsOptions());
+    }
+
+    /**
+     * Get information about the api versions of nodes in the cluster.
+     *
+     * @param nodes             The nodes to get information about, or null to get information about all nodes.
+     * @param options           The options to use when getting api versions of the nodes.
+     * @return                  The ApiVersionsResults.
+     */
+    public abstract ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
new file mode 100644
index 0000000..368a42e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.metrics.Sensor;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+/**
+ * The AdminClient configuration keys
+ */
+public class AdminClientConfig extends AbstractConfig {
+    private static final ConfigDef CONFIG;
+
+    /**
+     * <code>bootstrap.servers</code>
+     */
+    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+    private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
+
+    /**
+     * <code>reconnect.backoff.ms</code>
+     */
+    public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
+    private static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
+
+    /**
+     * <code>retry.backoff.ms</code>
+     */
+    public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
+    private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to " +
+                "retry a failed request. This avoids repeatedly sending requests in a tight loop under " +
+                "some failure scenarios.";
+
+    /** <code>connections.max.idle.ms</code> */
+    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
+    private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
+
+    /** <code>request.timeout.ms</code> */
+    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+    private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+
+    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+    private static final String CLIENT_ID_DOC = CommonClientConfigs.CLIENT_ID_DOC;
+
+    public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+    private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
+
+    public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
+    private static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC;
+
+    public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
+    private static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC;
+
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+    private static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
+
+    public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+    private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
+
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+    private static final String METRICS_SAMPLE_WINDOW_MS_DOC = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC;
+
+    public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
+
+    public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+    public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+    private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
+    private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
+
+    static {
+        CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
+                                        Type.LIST,
+                                        Importance.HIGH,
+                                        BOOTSTRAP_SERVERS_DOC)
+                                .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
+                                .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
+                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, SEND_BUFFER_DOC)
+                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 64 * 1024, atLeast(-1), Importance.MEDIUM, RECEIVE_BUFFER_DOC)
+                                .define(RECONNECT_BACKOFF_MS_CONFIG,
+                                        Type.LONG,
+                                        50L,
+                                        atLeast(0L),
+                                        Importance.LOW,
+                                        RECONNECT_BACKOFF_MS_DOC)
+                                .define(RETRY_BACKOFF_MS_CONFIG,
+                                        Type.LONG,
+                                        100L,
+                                        atLeast(0L),
+                                        Importance.LOW,
+                                        RETRY_BACKOFF_MS_DOC)
+                                .define(REQUEST_TIMEOUT_MS_CONFIG,
+                                        Type.INT,
+                                        120000,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        REQUEST_TIMEOUT_MS_DOC)
+                                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                                        Type.LONG,
+                                        5 * 60 * 1000,
+                                        Importance.MEDIUM,
+                                        CONNECTIONS_MAX_IDLE_MS_DOC)
+                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        METRICS_SAMPLE_WINDOW_MS_DOC)
+                                .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
+                                .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
+                                .define(METRICS_RECORDING_LEVEL_CONFIG,
+                                    Type.STRING,
+                                    Sensor.RecordingLevel.INFO.toString(),
+                                    in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+                                    Importance.LOW,
+                                    METRICS_RECORDING_LEVEL_DOC)
+                                // security support
+                                .define(SECURITY_PROTOCOL_CONFIG,
+                                        Type.STRING,
+                                        DEFAULT_SECURITY_PROTOCOL,
+                                        Importance.MEDIUM,
+                                        SECURITY_PROTOCOL_DOC)
+                                .withClientSslSupport()
+                                .withClientSaslSupport();
+    }
+
+    AdminClientConfig(Map<?, ?> props) {
+        super(CONFIG, props);
+    }
+
+    public static Set<String> configNames() {
+        return CONFIG.names();
+    }
+
+    public static void main(String[] args) {
+        System.out.println(CONFIG.toHtmlTable());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
new file mode 100644
index 0000000..cbcd234
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for the apiVersions call.
+ */
+@InterfaceStability.Unstable
+public class ApiVersionsOptions {
+    private Integer timeoutMs = null;
+
+    public ApiVersionsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
new file mode 100644
index 0000000..456c64d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ApiVersionsResults.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Results of the apiVersions call.
+ */
+@InterfaceStability.Unstable
+public class ApiVersionsResults {
+    private final Map<Node, KafkaFuture<NodeApiVersions>> futures;
+
+    ApiVersionsResults(Map<Node, KafkaFuture<NodeApiVersions>> futures) {
+        this.futures = futures;
+    }
+
+    public Map<Node, KafkaFuture<NodeApiVersions>> results() {
+        return futures;
+    }
+
+    public KafkaFuture<Map<Node, NodeApiVersions>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<Node, NodeApiVersions>>() {
+                @Override
+                public Map<Node, NodeApiVersions> apply(Void v) {
+                    Map<Node, NodeApiVersions> versions = new HashMap<>(futures.size());
+                    for (Map.Entry<Node, KafkaFuture<NodeApiVersions>> entry : futures.entrySet()) {
+                        try {
+                            versions.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures
+                            // completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return versions;
+                }
+            });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
new file mode 100644
index 0000000..03da7d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicResults.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of newTopics.
+ */
+@InterfaceStability.Unstable
+public class CreateTopicResults {
+    private final Map<String, KafkaFuture<Void>> futures;
+
+    CreateTopicResults(Map<String, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures, which can be used to check the status of individual
+     * topic creations.
+     */
+    public Map<String, KafkaFuture<Void>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds if all the topic creations succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
new file mode 100644
index 0000000..c1f3944
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for newTopics.
+ */
+@InterfaceStability.Unstable
+public class CreateTopicsOptions {
+    private Integer timeoutMs = null;
+    private boolean validateOnly = false;
+
+    public CreateTopicsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+
+    public CreateTopicsOptions validateOnly(boolean validateOnly) {
+        this.validateOnly = validateOnly;
+        return this;
+    }
+
+    public boolean validateOnly() {
+        return validateOnly;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
new file mode 100644
index 0000000..3dd4889
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicResults.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of the deleteTopics call.
+ */
+@InterfaceStability.Unstable
+public class DeleteTopicResults {
+    final Map<String, KafkaFuture<Void>> futures;
+
+    DeleteTopicResults(Map<String, KafkaFuture<Void>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures which can be used to check the status of
+     * individual deletions.
+     */
+    public Map<String, KafkaFuture<Void>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the topic deletions succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
new file mode 100644
index 0000000..3630968
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for deleteTopics.
+ */
+@InterfaceStability.Unstable
+public class DeleteTopicsOptions {
+    private Integer timeoutMs = null;
+
+    public DeleteTopicsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
new file mode 100644
index 0000000..604ee13
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for the describeCluster call.
+ */
+@InterfaceStability.Unstable
+public class DescribeClusterOptions {
+    private Integer timeoutMs = null;
+
+    public DescribeClusterOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
new file mode 100644
index 0000000..5ee834b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * The results of the describeCluster call.
+ */
+@InterfaceStability.Unstable
+public class DescribeClusterResults {
+    private final KafkaFuture<Collection<Node>> future;
+
+    DescribeClusterResults(KafkaFuture<Collection<Node>> future) {
+        this.future = future;
+    }
+
+    /**
+     * Returns a future which yields a collection of nodes.
+     */
+    public KafkaFuture<Collection<Node>> nodes() {
+        return future;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
new file mode 100644
index 0000000..1bf6632
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for describeTopics.
+ */
+@InterfaceStability.Unstable
+public class DescribeTopicsOptions {
+    private Integer timeoutMs = null;
+
+    public DescribeTopicsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
new file mode 100644
index 0000000..630ba95
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The results of the describeTopic call.
+ */
+@InterfaceStability.Unstable
+public class DescribeTopicsResults {
+    private final Map<String, KafkaFuture<TopicDescription>> futures;
+
+    DescribeTopicsResults(Map<String, KafkaFuture<TopicDescription>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from topic names to futures which can be used to check the status of
+     * individual deletions.
+     */
+    public Map<String, KafkaFuture<TopicDescription>> results() {
+        return futures;
+    }
+
+    /**
+     * Return a future which succeeds only if all the topic deletions succeed.
+     */
+    public KafkaFuture<Map<String, TopicDescription>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+            thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() {
+                @Override
+                public Map<String, TopicDescription> apply(Void v) {
+                    Map<String, TopicDescription> descriptions = new HashMap<>(futures.size());
+                    for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
+                        try {
+                            descriptions.put(entry.getKey(), entry.getValue().get());
+                        } catch (InterruptedException | ExecutionException e) {
+                            // This should be unreachable, because allOf ensured that all the futures
+                            // completed successfully.
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return descriptions;
+                }
+            });
+    }
+}