You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/01 23:24:49 UTC

[2/3] kafka git commit: KAFKA-3265; Add a public AdminClient API in Java (KIP-117)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
new file mode 100644
index 0000000..cf37aa7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -0,0 +1,1065 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.BrokerNotAvailableException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteTopicsRequest;
+import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+
+/**
+ * An administrative client for Kafka which supports managing and inspecting topics, brokers,
+ * and configurations.
+ */
+@InterfaceStability.Unstable
+public class KafkaAdminClient extends AdminClient {
+    private static final Logger log = LoggerFactory.getLogger(KafkaAdminClient.class);
+
+    /**
+     * The maximum number of times to retry a call before failing it.
+     */
+    private static final int MAX_CALL_RETRIES = 5;
+
+    /**
+     * The next integer to use to name a KafkaAdminClient which the user hasn't specified an explicit name for.
+     */
+    private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+
+    /**
+     * The prefix to use for the JMX metrics for this class
+     */
+    private static final String JMX_PREFIX = "kafka.admin.client";
+
+    /**
+     * The default timeout to use for an operation.
+     */
+    private final int defaultTimeoutMs;
+
+    /**
+     * The name of this AdminClient instance.
+     */
+    private final String clientId;
+
+    /**
+     * Provides the time.
+     */
+    private final Time time;
+
+    /**
+     * The cluster metadata used by the KafkaClient.
+     */
+    private final Metadata metadata;
+
+    /**
+     * The metrics for this KafkaAdminClient.
+     */
+    private final Metrics metrics;
+
+    /**
+     * The network client to use.
+     */
+    private final KafkaClient client;
+
+    /**
+     * The runnable used in the service thread for this admin client.
+     */
+    private final AdminClientRunnable runnable;
+
+    /**
+     * The network service thread for this admin client.
+     */
+    private final Thread thread;
+
+    /**
+     * True if this client is closed.
+     */
+    private volatile boolean closed = false;
+
+    /**
+     * Get or create a list value from a map.
+     *
+     * @param map   The map to get or create the element from.
+     * @param key   The key.
+     * @param <K>   The key type.
+     * @param <V>   The value type.
+     * @return      The list value.
+     */
+    static <K, V> List<V> getOrCreateListValue(Map<K, List<V>> map, K key) {
+        List<V> list = map.get(key);
+        if (list != null)
+            return list;
+        list = new LinkedList<>();
+        map.put(key, list);
+        return list;
+    }
+
+    /**
+     * Send an exception to every element in a collection of KafkaFutureImpls.
+     *
+     * @param futures   The collection of KafkaFutureImpl objects.
+     * @param exc       The exception
+     * @param <T>       The KafkaFutureImpl result type.
+     */
+    private static <T> void completeAllExceptionally(Collection<KafkaFutureImpl<T>> futures, Throwable exc) {
+        for (KafkaFutureImpl<?> future : futures) {
+            future.completeExceptionally(exc);
+        }
+    }
+
+    /**
+     * Get the current time remaining before a deadline as an integer.
+     *
+     * @param now           The current time in milliseconds.
+     * @param deadlineMs    The deadline time in milliseconds.
+     * @return              The time delta in milliseconds.
+     */
+    static int calcTimeoutMsRemainingAsInt(long now, long deadlineMs) {
+        long deltaMs = deadlineMs - now;
+        if (deltaMs > Integer.MAX_VALUE)
+            deltaMs = Integer.MAX_VALUE;
+        else if (deltaMs < Integer.MIN_VALUE)
+            deltaMs = Integer.MIN_VALUE;
+        return (int) deltaMs;
+    }
+
+    /**
+     * Generate the client id based on the configuration.
+     *
+     * @param config    The configuration
+     *
+     * @return          The client id
+     */
+    static String generateClientId(AdminClientConfig config) {
+        String clientId = config.getString(AdminClientConfig.CLIENT_ID_CONFIG);
+        if (!clientId.isEmpty())
+            return clientId;
+        return "adminclient-" + ADMIN_CLIENT_ID_SEQUENCE.getAndIncrement();
+    }
+
+    /**
+     * Get the deadline for a particular call.
+     *
+     * @param now               The current time in milliseconds.
+     * @param optionTimeoutMs   The timeout option given by the user.
+     *
+     * @return                  The deadline in milliseconds.
+     */
+    private long calcDeadlineMs(long now, Integer optionTimeoutMs) {
+        if (optionTimeoutMs != null)
+            return now + Math.max(0, optionTimeoutMs);
+        return now + defaultTimeoutMs;
+    }
+
+    /**
+     * Pretty-print an exception.
+     *
+     * @param throwable     The exception.
+     *
+     * @return              A compact human-readable string.
+     */
+    static String prettyPrintException(Throwable throwable) {
+        if (throwable == null)
+            return "Null exception.";
+        if (throwable.getMessage() != null) {
+            return throwable.getClass().getSimpleName() + ": " + throwable.getMessage();
+        }
+        return throwable.getClass().getSimpleName();
+    }
+
+    static KafkaAdminClient create(AdminClientConfig config) {
+        Metadata metadata = null;
+        Metrics metrics = null;
+        NetworkClient networkClient = null;
+        Time time = Time.SYSTEM;
+        String clientId = generateClientId(config);
+        ChannelBuilder channelBuilder = null;
+        Selector selector = null;
+        ApiVersions apiVersions = new ApiVersions();
+
+        try {
+            metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                    config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+            List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                MetricsReporter.class);
+            Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
+                .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+                .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+                .tags(metricTags);
+            reporters.add(new JmxReporter(JMX_PREFIX));
+            metrics = new Metrics(metricConfig, reporters, time);
+            String metricGrpPrefix = "admin-client";
+            channelBuilder = ClientUtils.createChannelBuilder(config);
+            selector = new Selector(config.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+                    metrics, time, metricGrpPrefix, channelBuilder);
+            networkClient = new NetworkClient(
+                selector,
+                metadata,
+                clientId,
+                100,
+                config.getLong(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
+                config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
+                config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                time,
+                true,
+                apiVersions);
+            channelBuilder = null;
+            return new KafkaAdminClient(config, clientId, time, metadata, metrics, networkClient);
+        } catch (Throwable exc) {
+            closeQuietly(metrics, "Metrics");
+            closeQuietly(networkClient, "NetworkClient");
+            closeQuietly(selector, "Selector");
+            closeQuietly(channelBuilder, "ChannelBuilder");
+            throw new KafkaException("Failed create new KafkaAdminClient", exc);
+        }
+    }
+
+    static KafkaAdminClient create(AdminClientConfig config, KafkaClient client, Metadata metadata) {
+        Metrics metrics = null;
+        Time time = Time.SYSTEM;
+        String clientId = generateClientId(config);
+
+        try {
+            metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time);
+            return new KafkaAdminClient(config, clientId, time, metadata, metrics, client);
+        } catch (Throwable exc) {
+            closeQuietly(metrics, "Metrics");
+            throw new KafkaException("Failed create new KafkaAdminClient", exc);
+        }
+    }
+
+    private KafkaAdminClient(AdminClientConfig config, String clientId, Time time, Metadata metadata,
+                     Metrics metrics, KafkaClient client) {
+        this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        this.clientId = clientId;
+        this.time = time;
+        this.metadata = metadata;
+        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+            config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+        this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
+        this.metrics = metrics;
+        this.client = client;
+        this.runnable = new AdminClientRunnable();
+        String threadName = "kafka-admin-client-thread" + (clientId.length() > 0 ? " | " + clientId : "");
+        this.thread = new KafkaThread(threadName, runnable, false);
+        config.logUnused();
+        log.debug("Created Kafka admin client {}", this.clientId);
+        thread.start();
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+        client.wakeup(); // Wake the thread, if it is blocked inside poll().
+        try {
+            // Wait for the thread to be joined.
+            thread.join();
+            log.debug("{}: closed.", clientId);
+        } catch (InterruptedException e) {
+            log.debug("{}: interrupted while joining I/O thread", clientId, e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * An interface for providing a node for a call.
+     */
+    private interface NodeProvider {
+        Node provide();
+    }
+
+    /**
+     * Provides a constant node which is known at construction time.
+     */
+    private static class ConstantAdminNodeProvider implements NodeProvider {
+        private final Node node;
+
+        ConstantAdminNodeProvider(Node node) {
+            this.node = node;
+        }
+
+        @Override
+        public Node provide() {
+            return node;
+        }
+    }
+
+    /**
+     * Provides the controller node.
+     */
+    private class ControllerNodeProvider implements NodeProvider {
+        @Override
+        public Node provide() {
+            return metadata.fetch().controller();
+        }
+    }
+
+    /**
+     * Provides the least loaded node.
+     */
+    private class LeastLoadedNodeProvider implements NodeProvider {
+        @Override
+        public Node provide() {
+            return client.leastLoadedNode(time.milliseconds());
+        }
+    }
+
+    private abstract class Call {
+        private final String callName;
+        private final long deadlineMs;
+        private final NodeProvider nodeProvider;
+        private int tries = 0;
+
+        Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
+            this.callName = callName;
+            this.deadlineMs = deadlineMs;
+            this.nodeProvider = nodeProvider;
+        }
+
+        /**
+         * Handle a failure.
+         *
+         * Depending on what the exception is and how many times we have already tried, we may choose to
+         * fail the Call, or retry it.  It is important to print the stack traces here in some cases,
+         * since they are not necessarily preserved in ApiVersionException objects.
+         *
+         * @param now           The current time in milliseconds.
+         * @param throwable     The failure exception.
+         */
+        final void fail(long now, Throwable throwable) {
+            // If this is an UnsupportedVersionException that we can retry, do so.
+            if ((throwable instanceof UnsupportedVersionException) &&
+                     handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
+                log.trace("{} attempting protocol downgrade.", this);
+                runnable.call(this, now);
+                return;
+            }
+            tries++;
+            // If the call has timed out, fail.
+            if (calcTimeoutMsRemainingAsInt(now, deadlineMs) < 0) {
+                if (log.isDebugEnabled()) {
+                    log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
+                        new Exception(prettyPrintException(throwable)));
+                }
+                handleFailure(throwable);
+                return;
+            }
+            // If the exception is not retryable, fail.
+            if (!(throwable instanceof RetriableException)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("{} failed with non-retriable exception after {} attempt(s)", this, tries,
+                        new Exception(prettyPrintException(throwable)));
+                }
+                handleFailure(throwable);
+                return;
+            }
+            // If we are out of retries, fail.
+            if (tries > MAX_CALL_RETRIES) {
+                if (log.isDebugEnabled()) {
+                    log.debug("{} failed after {} attempt(s)", this, tries,
+                        new Exception(prettyPrintException(throwable)));
+                }
+                handleFailure(throwable);
+                return;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("{} failed: {}.  Beginning retry #{}",
+                    this, prettyPrintException(throwable), tries);
+            }
+            runnable.call(this, now);
+        }
+
+        /**
+         * Create an AbstractRequest.Builder for this Call.
+         *
+         * @param timeoutMs The timeout in milliseconds.
+         *
+         * @return          The AbstractRequest builder.
+         */
+        abstract AbstractRequest.Builder createRequest(int timeoutMs);
+
+        /**
+         * Process the call response.
+         *
+         * @param abstractResponse  The AbstractResponse.
+         *
+         * @return                  True if the response has been processed; false to re-submit the request.
+         */
+        abstract void handleResponse(AbstractResponse abstractResponse);
+
+        /**
+         * Handle a failure.  This will only be called if the failure exception was not
+         * retryable, or if we hit a timeout.
+         *
+         * @param throwable     The exception.
+         */
+        abstract void handleFailure(Throwable throwable);
+
+        /**
+         * Handle an UnsupportedVersionException.
+         *
+         * @param exception     The exception.
+         *
+         * @return              True if the exception can be handled; false otherwise.
+         */
+        boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return "Call(callName=" + callName + ", deadlineMs=" + deadlineMs + ")";
+        }
+    }
+
+    private final class AdminClientRunnable implements Runnable {
+        /**
+         * Pending calls.  Protected by the object monitor.
+         */
+        private List<Call> newCalls = new LinkedList<>();
+
+        /**
+         * Check if the AdminClient metadata is ready.
+         * We need to know who the controller is, and have a non-empty view of the cluster.
+         *
+         * @param prevMetadataVersion       The previous metadata version which wasn't usable.
+         * @return                          null if the metadata is usable; the current metadata
+         *                                  version otherwise
+         */
+        private Integer checkMetadataReady(Integer prevMetadataVersion) {
+            if (prevMetadataVersion != null) {
+                if (prevMetadataVersion == metadata.version())
+                    return prevMetadataVersion;
+            }
+            Cluster cluster = metadata.fetch();
+            if (cluster.nodes().isEmpty()) {
+                log.trace("{}: metadata is not ready yet.  No cluster nodes found.", clientId);
+                return metadata.requestUpdate();
+            }
+            if (cluster.controller() == null) {
+                log.trace("{}: metadata is not ready yet.  No controller found.", clientId);
+                return metadata.requestUpdate();
+            }
+            if (prevMetadataVersion != null) {
+                log.trace("{}: metadata is now ready.", clientId);
+            }
+            return null;
+        }
+
+        /**
+         * Time out a list of calls.
+         *
+         * @param now       The current time in milliseconds.
+         * @param calls     The collection of calls.  Must be sorted from oldest to newest.
+         */
+        private int timeoutCalls(long now, Collection<Call> calls) {
+            int numTimedOut = 0;
+            for (Iterator<Call> iter = calls.iterator(); iter.hasNext(); ) {
+                Call call = iter.next();
+                if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
+                    call.fail(now, new TimeoutException());
+                    iter.remove();
+                    numTimedOut++;
+                }
+            }
+            return numTimedOut;
+        }
+
+        /**
+         * Time out the elements in the newCalls list which are expired.
+         *
+         * @param now       The current time in milliseconds.
+         */
+        private synchronized void timeoutNewCalls(long now) {
+            int numTimedOut = timeoutCalls(now, newCalls);
+            if (numTimedOut > 0) {
+                log.debug("{}: timed out {} new calls.", clientId, numTimedOut);
+            }
+        }
+
+        /**
+         * Time out calls which have been assigned to nodes.
+         *
+         * @param now           The current time in milliseconds.
+         * @param callsToSend   A map of nodes to the calls they need to handle.
+         */
+        private void timeoutCallsToSend(long now, Map<Node, List<Call>> callsToSend) {
+            int numTimedOut = 0;
+            for (List<Call> callList : callsToSend.values()) {
+                numTimedOut += timeoutCalls(now, callList);
+            }
+            if (numTimedOut > 0)
+                log.debug("{}: timed out {} call(s) with assigned nodes.", clientId, numTimedOut);
+        }
+
+        /**
+         * Choose nodes for the calls in the callsToSend list.
+         *
+         * This function holds the lock for the minimum amount of time, to avoid blocking
+         * users of AdminClient who will also take the lock to add new calls.
+         *
+         * @param now           The current time in milliseconds.
+         * @param callsToSend   A map of nodes to the calls they need to handle.
+         *
+         * @return              The new calls we need to process.
+         */
+        private void chooseNodesForNewCalls(long now, Map<Node, List<Call>> callsToSend) {
+            List<Call> newCallsToAdd = null;
+            synchronized (this) {
+                if (newCalls.isEmpty()) {
+                    return;
+                }
+                newCallsToAdd = newCalls;
+                newCalls = new LinkedList<>();
+            }
+            for (Call call : newCallsToAdd) {
+                chooseNodeForNewCall(now, callsToSend, call);
+            }
+        }
+
+        /**
+         * Choose a node for a new call.
+         *
+         * @param now           The current time in milliseconds.
+         * @param callsToSend   A map of nodes to the calls they need to handle.
+         * @param call          The call.
+         */
+        private void chooseNodeForNewCall(long now, Map<Node, List<Call>> callsToSend, Call call) {
+            Node node = call.nodeProvider.provide();
+            if (node == null) {
+                call.fail(now, new BrokerNotAvailableException(
+                    String.format("Error choosing node for %s: no node found.", call.callName)));
+                return;
+            }
+            log.trace("{}: assigned {} to {}", clientId, call, node);
+            getOrCreateListValue(callsToSend, node).add(call);
+        }
+
+        /**
+         * Send the calls which are ready.
+         *
+         * @param now                   The current time in milliseconds.
+         * @param callsToSend           The calls to send, by node.
+         * @param correlationIdToCalls  A map of correlation IDs to calls.
+         * @param callsInFlight         A map of nodes to the calls they have in flight.
+         *
+         * @return                      The minimum timeout we need for poll().
+         */
+        private long sendEligibleCalls(long now, Map<Node, List<Call>> callsToSend,
+                         Map<Integer, Call> correlationIdToCalls, Map<String, List<Call>> callsInFlight) {
+            long pollTimeout = Long.MAX_VALUE;
+            for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator();
+                     iter.hasNext(); ) {
+                Map.Entry<Node, List<Call>> entry = iter.next();
+                List<Call> calls = entry.getValue();
+                if (calls.isEmpty()) {
+                    iter.remove();
+                    continue;
+                }
+                Node node = entry.getKey();
+                if (!client.ready(node, now)) {
+                    long nodeTimeout = client.connectionDelay(node, now);
+                    pollTimeout = Math.min(pollTimeout, nodeTimeout);
+                    log.trace("{}: client is not ready to send to {}.  Must delay {} ms", clientId, node, nodeTimeout);
+                    continue;
+                }
+                Call call = calls.remove(0);
+                int timeoutMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
+                AbstractRequest.Builder<?> requestBuilder = null;
+                try {
+                    requestBuilder = call.createRequest(timeoutMs);
+                } catch (Throwable throwable) {
+                    call.fail(now, new KafkaException(String.format(
+                        "Internal error sending %s to %s.", call.callName, node)));
+                    continue;
+                }
+                ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true);
+                log.trace("{}: sending {} to {}. correlationId={}", clientId, requestBuilder, node,
+                    clientRequest.correlationId());
+                client.send(clientRequest, now);
+                getOrCreateListValue(callsInFlight, node.idString()).add(call);
+                correlationIdToCalls.put(clientRequest.correlationId(), call);
+            }
+            return pollTimeout;
+        }
+
+        /**
+         * Time out expired calls that are in flight.
+         *
+         * Calls that are in flight may have been partially or completely sent over the wire.  They may
+         * even be in the process of being processed by the remote server.  At the moment, our only option
+         * to time them out is to close the entire connection.
+         *
+         * @param now                   The current time in milliseconds.
+         * @param callsInFlight         A map of nodes to the calls they have in flight.
+         */
+        private void timeoutCallsInFlight(long now, Map<String, List<Call>> callsInFlight) {
+            int numTimedOut = 0;
+            for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) {
+                List<Call> contexts = entry.getValue();
+                if (contexts.isEmpty())
+                    continue;
+                String nodeId = entry.getKey();
+                // We assume that the first element in the list is the earliest.  So it should be the
+                // only one we need to check the timeout for.
+                Call call = contexts.get(0);
+                if (calcTimeoutMsRemainingAsInt(now, call.deadlineMs) < 0) {
+                    log.debug("{}: Closing connection to {} to time out {}", clientId, nodeId, call);
+                    client.close(nodeId);
+                    numTimedOut++;
+                    // We don't remove anything from the callsInFlight data structure.  Because the connection
+                    // has been closed, the calls should be returned by the next client#poll(),
+                    // and handled at that point.
+                }
+            }
+            if (numTimedOut > 0)
+                log.debug("{}: timed out {} call(s) in flight.", clientId, numTimedOut);
+        }
+
+        /**
+         * Handle responses from the server.
+         *
+         * @param now                   The current time in milliseconds.
+         * @param responses             The latest responses from KafkaClient.
+         * @param correlationIdToCall   A map of correlation IDs to calls.
+         * @param callsInFlight         A map of nodes to the calls they have in flight.
+        **/
+        private void handleResponses(long now, List<ClientResponse> responses, Map<String, List<Call>> callsInFlight,
+                Map<Integer, Call> correlationIdToCall) {
+            for (ClientResponse response : responses) {
+                int correlationId = response.requestHeader().correlationId();
+
+                Call call = correlationIdToCall.get(correlationId);
+                if (call == null) {
+                    // If the server returns information about a correlation ID we didn't use yet,
+                    // an internal server error has occurred.  Close the connection and log an error message.
+                    log.error("Internal server error on {}: server returned information about unknown " +
+                        "correlation ID {}", response.destination(), correlationId);
+                    client.close(response.destination());
+                    continue;
+                }
+
+                // Stop tracking this call.
+                correlationIdToCall.remove(correlationId);
+                getOrCreateListValue(callsInFlight, response.requestHeader().clientId()).remove(call);
+
+                // Handle the result of the call.  This may involve retrying the call, if we got a
+                // retryible exception.
+                if (response.versionMismatch() != null) {
+                    call.fail(now, response.versionMismatch());
+                } else if (response.wasDisconnected()) {
+                    call.fail(now, new DisconnectException(String.format(
+                        "Cancelled %s request with correlation id %s due to node %s being disconnected",
+                        call.callName, correlationId, response.destination())));
+                } else {
+                    try {
+                        call.handleResponse(response.responseBody());
+                        if (log.isTraceEnabled())
+                            log.trace("{}: {} got response {}", clientId, call, response.responseBody());
+                    } catch (Throwable t) {
+                        if (log.isTraceEnabled())
+                            log.trace("{}: {} handleResponse failed with {}", clientId, call, prettyPrintException(t));
+                        call.fail(now, t);
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            /**
+             * Maps nodes to calls that we want to send.
+             */
+            Map<Node, List<Call>> callsToSend = new HashMap<>();
+
+            /**
+             * Maps node ID strings to calls that have been sent.
+             */
+            Map<String, List<Call>> callsInFlight = new HashMap<>();
+
+            /**
+             * Maps correlation IDs to calls that have been sent.
+             */
+            Map<Integer, Call> correlationIdToCalls = new HashMap<>();
+
+            /**
+             * The previous metadata version which wasn't usable, or null if there is none.
+             */
+            Integer prevMetadataVersion = null;
+
+            long now = time.milliseconds();
+            log.trace("{} thread starting", clientId);
+            while (true) {
+                // Check if the AdminClient is shutting down.
+                if (closed)
+                    break;
+
+                // Handle timeouts.
+                timeoutNewCalls(now);
+                timeoutCallsToSend(now, callsToSend);
+                timeoutCallsInFlight(now, callsInFlight);
+
+                // Handle new calls and metadata update requests.
+                prevMetadataVersion = checkMetadataReady(prevMetadataVersion);
+                long pollTimeout = 1200000;
+                if (prevMetadataVersion == null) {
+                    chooseNodesForNewCalls(now, callsToSend);
+                    pollTimeout = Math.min(pollTimeout,
+                        sendEligibleCalls(now, callsToSend, correlationIdToCalls, callsInFlight));
+                }
+
+                // Wait for network responses.
+                log.trace("{}: entering KafkaClient#poll(timeout={})", clientId, pollTimeout);
+                List<ClientResponse> responses = client.poll(pollTimeout, now);
+                log.trace("{}: KafkaClient#poll retrieved {} response(s)", clientId, responses.size());
+
+                // Update the current time and handle the latest responses.
+                now = time.milliseconds();
+                handleResponses(now, responses, callsInFlight, correlationIdToCalls);
+            }
+            int numTimedOut = 0;
+            synchronized (this) {
+                numTimedOut += timeoutCalls(Long.MAX_VALUE, newCalls);
+            }
+            numTimedOut += timeoutCalls(Long.MAX_VALUE, correlationIdToCalls.values());
+            if (numTimedOut > 0) {
+                log.debug("{}: timed out {} remaining operations.", clientId, numTimedOut);
+            }
+            closeQuietly(client, "KafkaClient");
+            closeQuietly(metrics, "Metrics");
+            log.debug("{}: exiting AdminClientRunnable thread.", clientId);
+        }
+
+        void call(Call call, long now) {
+            if (log.isDebugEnabled()) {
+                log.debug("{}: queueing {} with a timeout {} ms from now.",
+                    clientId, call, call.deadlineMs - now);
+            }
+            synchronized (this) {
+                newCalls.add(call);
+            }
+            client.wakeup();
+        }
+    }
+
+    @Override
+    public CreateTopicResults createTopics(final Collection<NewTopic> newTopics,
+                                           final CreateTopicsOptions options) {
+        final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
+        for (NewTopic newTopic : newTopics) {
+            topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
+        }
+        final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
+        for (NewTopic newTopic : newTopics) {
+            topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
+        }
+        final long now = time.milliseconds();
+        runnable.call(new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            public AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.validateOnly());
+            }
+
+            @Override
+            public void handleResponse(AbstractResponse abstractResponse) {
+                CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse;
+                // Handle server responses for particular topics.
+                for (Map.Entry<String, CreateTopicsResponse.Error> entry : response.errors().entrySet()) {
+                    KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
+                    if (future == null) {
+                        log.warn("Server response mentioned unknown topic {}", entry.getKey());
+                    } else {
+                        ApiException exception = entry.getValue().exception();
+                        if (exception != null) {
+                            future.completeExceptionally(exception);
+                        } else {
+                            future.complete(null);
+                        }
+                    }
+                }
+                // The server should send back a response for every topic.  But do a sanity check anyway.
+                for (Map.Entry<String, KafkaFutureImpl<Void>> entry : topicFutures.entrySet()) {
+                    KafkaFutureImpl<Void> future = entry.getValue();
+                    if (!future.isDone()) {
+                        future.completeExceptionally(new ApiException("The server response did not " +
+                            "contain a reference to node " + entry.getKey()));
+                    }
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(topicFutures.values(), throwable);
+            }
+        }, now);
+        return new CreateTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+    }
+
+    @Override
+    public DeleteTopicResults deleteTopics(final Collection<String> topicNames,
+                                           DeleteTopicsOptions options) {
+        final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
+        for (String topicName : topicNames) {
+            topicFutures.put(topicName, new KafkaFutureImpl<Void>());
+        }
+        final long now = time.milliseconds();
+        runnable.call(new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DeleteTopicsRequest.Builder(new HashSet<>(topicNames), timeoutMs);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse;
+                // Handle server responses for particular topics.
+                for (Map.Entry<String, Errors> entry : response.errors().entrySet()) {
+                    KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
+                    if (future == null) {
+                        log.warn("Server response mentioned unknown topic {}", entry.getKey());
+                    } else {
+                        ApiException exception = entry.getValue().exception();
+                        if (exception != null) {
+                            future.completeExceptionally(exception);
+                        } else {
+                            future.complete(null);
+                        }
+                    }
+                }
+                // The server should send back a response for every topic.  But do a sanity check anyway.
+                for (Map.Entry<String, KafkaFutureImpl<Void>> entry : topicFutures.entrySet()) {
+                    KafkaFutureImpl<Void> future = entry.getValue();
+                    if (!future.isDone()) {
+                        future.completeExceptionally(new ApiException("The server response did not " +
+                            "contain a reference to node " + entry.getKey()));
+                    }
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(topicFutures.values(), throwable);
+            }
+        }, now);
+        return new DeleteTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+    }
+
+    @Override
+    public ListTopicsResults listTopics(final ListTopicsOptions options) {
+        final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return MetadataRequest.Builder.allTopics();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) abstractResponse;
+                Cluster cluster = response.cluster();
+                Map<String, TopicListing> topicListing = new HashMap<>();
+                for (String topicName : cluster.topics()) {
+                    boolean internal = cluster.internalTopics().contains(topicName);
+                    if (!internal || options.listInternal())
+                        topicListing.put(topicName, new TopicListing(topicName, internal));
+                }
+                topicListingFuture.complete(topicListing);
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                topicListingFuture.completeExceptionally(throwable);
+            }
+        }, now);
+        return new ListTopicsResults(topicListingFuture);
+    }
+
+    @Override
+    public DescribeTopicsResults describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) {
+        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
+        for (String topicName : topicNames) {
+            topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
+        }
+        final long now = time.milliseconds();
+        runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new DeleteTopicsRequest.Builder(new HashSet<>(topicNames), timeoutMs);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) abstractResponse;
+                // Handle server responses for particular topics.
+                for (Map.Entry<String, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) {
+                    String topicName = entry.getKey();
+                    KafkaFutureImpl<TopicDescription> future = entry.getValue();
+                    Errors topicError = response.errors().get(topicName);
+                    if (topicError != null) {
+                        future.completeExceptionally(topicError.exception());
+                        continue;
+                    }
+                    Cluster cluster = response.cluster();
+                    if (!cluster.topics().contains(topicName)) {
+                        future.completeExceptionally(new InvalidTopicException("Topic " + topicName + " not found."));
+                        continue;
+                    }
+                    boolean isInternal = cluster.internalTopics().contains(topicName);
+                    TreeMap<Integer, TopicPartitionInfo> partitions = new TreeMap<>();
+                    List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName);
+                    for (PartitionInfo partitionInfo : partitionInfos) {
+                        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
+                            partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()),
+                            Arrays.asList(partitionInfo.inSyncReplicas()));
+                        partitions.put(partitionInfo.partition(), topicPartitionInfo);
+                    }
+                    TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions);
+                    future.complete(topicDescription);
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(topicFutures.values(), throwable);
+            }
+        }, now);
+        return new DescribeTopicsResults(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
+    }
+
+    @Override
+    public DescribeClusterResults describeCluster(DescribeClusterOptions options) {
+        final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(Collections.<String>emptyList());
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) abstractResponse;
+                describeClusterFuture.complete(response.brokers());
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                describeClusterFuture.completeExceptionally(throwable);
+            }
+        }, now);
+        return new DescribeClusterResults(describeClusterFuture);
+    }
+
+    @Override
+    public ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options) {
+        final long now = time.milliseconds();
+        final long deadlineMs = calcDeadlineMs(now, options.timeoutMs());
+        Map<Node, KafkaFuture<NodeApiVersions>> nodeFutures = new HashMap<>();
+        for (final Node node : nodes) {
+            final KafkaFutureImpl<NodeApiVersions> nodeFuture = new KafkaFutureImpl<>();
+            nodeFutures.put(node, nodeFuture);
+            runnable.call(new Call("apiVersions", deadlineMs, new ConstantAdminNodeProvider(node)) {
+                    @Override
+                    public AbstractRequest.Builder createRequest(int timeoutMs) {
+                        return new ApiVersionsRequest.Builder();
+                    }
+
+                    @Override
+                    public void handleResponse(AbstractResponse abstractResponse) {
+                        ApiVersionsResponse response = (ApiVersionsResponse) abstractResponse;
+                        nodeFuture.complete(new NodeApiVersions(response.apiVersions()));
+                    }
+
+                    @Override
+                    public void handleFailure(Throwable throwable) {
+                        nodeFuture.completeExceptionally(throwable);
+                    }
+                }, now);
+        }
+        return new ApiVersionsResults(nodeFutures);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
new file mode 100644
index 0000000..02a0a40
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for listTopics.
+ */
+@InterfaceStability.Unstable
+public class ListTopicsOptions {
+    private Integer timeoutMs = null;
+    private boolean listInternal = false;
+
+    public ListTopicsOptions timeoutMs(Integer timeoutMs) {
+        this.timeoutMs = timeoutMs;
+        return this;
+    }
+
+    public Integer timeoutMs() {
+        return timeoutMs;
+    }
+
+    /**
+     * Set whether we should list internal topics.
+     *
+     * @param listInternal  Whether we should list internal topics.  null means to use
+     *                      the default.
+     * @return              This ListTopicsOptions object.
+     */
+    public ListTopicsOptions listInternal(boolean listInternal) {
+        this.listInternal = listInternal;
+        return this;
+    }
+
+    public boolean listInternal() {
+        return listInternal;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
new file mode 100644
index 0000000..7e9448d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The result of the listTopics call.
+ */
+@InterfaceStability.Unstable
+public class ListTopicsResults {
+    final KafkaFuture<Map<String, TopicListing>> future;
+
+    ListTopicsResults(KafkaFuture<Map<String, TopicListing>> future) {
+        this.future = future;
+    }
+
+    /**
+     * Return a future which yields a map of topic names to TopicListing objects.
+     */
+    public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() {
+        return future;
+    }
+
+    /**
+     * Return a future which yields a collection of TopicListing objects.
+     */
+    public KafkaFuture<Collection<TopicListing>> descriptions() {
+        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() {
+            @Override
+            public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) {
+                return namesToDescriptions.values();
+            }
+        });
+    }
+
+    /**
+     * Return a future which yields a collection of topic names.
+     */
+    public KafkaFuture<Collection<String>> names() {
+        return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() {
+            @Override
+            public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) {
+                return namesToDescriptions.keySet();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
new file mode 100644
index 0000000..a1f6fb5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A request to create a new topic through the AdminClient API. 
+ */
+public class NewTopic {
+    private final String name;
+    private final int numPartitions;
+    private final short replicationFactor;
+    private final Map<Integer, List<Integer>> replicasAssignments;
+    private Map<String, String> configs = null;
+
+    /**
+     * Create a new topic with a fixed replication factor and number of partitions.
+     */
+    public NewTopic(String name, int numPartitions, short replicationFactor) {
+        this.name = name;
+        this.numPartitions = numPartitions;
+        this.replicationFactor = replicationFactor;
+        this.replicasAssignments = null;
+    }
+
+    /**
+     * A request to create a new topic with a specific replica assignment configuration.
+     */
+    public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments) {
+        this.name = name;
+        this.numPartitions = -1;
+        this.replicationFactor = -1;
+        this.replicasAssignments = replicasAssignments;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Set the configuration to use on the new topic.
+     *
+     * @param configs               The configuration map.
+     * @return                      This NewTopic object.
+     */
+    public NewTopic configs(Map<String, String> configs) {
+        this.configs = configs;
+        return this;
+    }
+
+    TopicDetails convertToTopicDetails() {
+        if (replicasAssignments != null) {
+            if (configs != null) {
+                return new TopicDetails(replicasAssignments, configs);
+            } else {
+                return new TopicDetails(replicasAssignments);
+            }
+        } else {
+            if (configs != null) {
+                return new TopicDetails(numPartitions, replicationFactor, configs);
+            } else {
+                return new TopicDetails(numPartitions, replicationFactor);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
new file mode 100644
index 0000000..2fc4442
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.NavigableMap;
+
+/**
+ * A detailed description of a single topic in the cluster.
+ */
+public class TopicDescription {
+    private final String name;
+    private final boolean internal;
+    private final NavigableMap<Integer, TopicPartitionInfo> partitions;
+
+    TopicDescription(String name, boolean internal,
+                    NavigableMap<Integer, TopicPartitionInfo> partitions) {
+        this.name = name;
+        this.internal = internal;
+        this.partitions = partitions;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public boolean internal() {
+        return internal;
+    }
+
+    public NavigableMap<Integer, TopicPartitionInfo> partitions() {
+        return partitions;
+    }
+
+    @Override
+    public String toString() {
+        return "(name=" + name + ", internal=" + internal + ", partitions=" +
+            Utils.mkString(partitions, "[", "]", "=", ",") + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
new file mode 100644
index 0000000..4c25551
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * A listing of a topic in the cluster.
+ */
+public class TopicListing {
+    private final String name;
+    private final boolean internal;
+
+    TopicListing(String name, boolean internal) {
+        this.name = name;
+        this.internal = internal;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public boolean internal() {
+        return internal;
+    }
+
+    @Override
+    public String toString() {
+        return "(name=" + name + ", internal=" + internal + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
new file mode 100644
index 0000000..b304802
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.List;
+
+public class TopicPartitionInfo {
+    private final int partition;
+    private final Node leader;
+    private final List<Node> replicas;
+    private final List<Node> isr;
+
+    TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
+        this.partition = partition;
+        this.leader = leader;
+        this.replicas = replicas;
+        this.isr = isr;
+    }
+
+    public int partition() {
+        return partition;
+    }
+
+    public Node leader() {
+        return leader;
+    }
+
+    public List<Node> replicas() {
+        return replicas;
+    }
+
+    public List<Node> isr() {
+        return isr;
+    }
+
+    public String toString() {
+        return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
+            Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index ba1d2af..6619b4c 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -37,6 +37,7 @@ public final class Cluster {
     private final List<Node> nodes;
     private final Set<String> unauthorizedTopics;
     private final Set<String> internalTopics;
+    private final Node controller;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
     private final Map<String, List<PartitionInfo>> partitionsByTopic;
     private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
@@ -54,7 +55,7 @@ public final class Cluster {
     public Cluster(Collection<Node> nodes,
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics) {
-        this(null, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet());
+        this(null, false, nodes, partitions, unauthorizedTopics, Collections.<String>emptySet(), null);
     }
 
 
@@ -68,7 +69,21 @@ public final class Cluster {
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics) {
-        this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, null);
+    }
+
+    /**
+     * Create a new cluster with the given id, nodes and partitions
+     * @param nodes The nodes in the cluster
+     * @param partitions Information about a subset of the topic-partitions this cluster hosts
+     */
+    public Cluster(String clusterId,
+                   Collection<Node> nodes,
+                   Collection<PartitionInfo> partitions,
+                   Set<String> unauthorizedTopics,
+                   Set<String> internalTopics,
+                   Node controller) {
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, internalTopics, controller);
     }
 
     private Cluster(String clusterId,
@@ -76,7 +91,8 @@ public final class Cluster {
                     Collection<Node> nodes,
                     Collection<PartitionInfo> partitions,
                     Set<String> unauthorizedTopics,
-                    Set<String> internalTopics) {
+                    Set<String> internalTopics,
+                    Node controller) {
         this.isBootstrapConfigured = isBootstrapConfigured;
         this.clusterResource = new ClusterResource(clusterId);
         // make a randomized, unmodifiable copy of the nodes
@@ -130,6 +146,7 @@ public final class Cluster {
 
         this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
         this.internalTopics = Collections.unmodifiableSet(internalTopics);
+        this.controller = controller;
     }
 
     /**
@@ -137,7 +154,7 @@ public final class Cluster {
      */
     public static Cluster empty() {
         return new Cluster(null, new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(),
-                Collections.<String>emptySet());
+                Collections.<String>emptySet(), null);
     }
 
     /**
@@ -150,7 +167,7 @@ public final class Cluster {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
-        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());
+        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet(), null);
     }
 
     /**
@@ -160,7 +177,7 @@ public final class Cluster {
         Map<TopicPartition, PartitionInfo> combinedPartitions = new HashMap<>(this.partitionsByTopicPartition);
         combinedPartitions.putAll(partitions);
         return new Cluster(clusterResource.clusterId(), this.nodes, combinedPartitions.values(),
-                new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics));
+                new HashSet<>(this.unauthorizedTopics), new HashSet<>(this.internalTopics), this.controller);
     }
 
     /**
@@ -265,6 +282,10 @@ public final class Cluster {
         return clusterResource;
     }
 
+    public Node controller() {
+        return controller;
+    }
+
     @Override
     public String toString() {
         return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
new file mode 100644
index 0000000..3c51fbe
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A flexible future which supports call chaining and other asynchronous programming patterns.
+ */
+public abstract class KafkaFuture<T> implements Future<T> {
+    /**
+     * A function which takes objects of type A and returns objects of type B.
+     */
+    public static abstract class Function<A, B> {
+        public abstract B apply(A a);
+    }
+
+    /**
+     * A consumer of two different types of object.
+     */
+    public static abstract class BiConsumer<A, B> {
+        public abstract void accept(A a, B b);
+    }
+
+    private static class AllOfAdapter<R> extends BiConsumer<R, Throwable> {
+        private int remainingResponses;
+        private KafkaFuture future;
+
+        public AllOfAdapter(int remainingResponses, KafkaFuture future) {
+            this.remainingResponses = remainingResponses;
+            this.future = future;
+        }
+
+        @Override
+        public synchronized void accept(R newValue, Throwable exception) {
+            if (remainingResponses <= 0)
+                return;
+            if (exception != null) {
+                remainingResponses = 0;
+                future.completeExceptionally(exception);
+            } else {
+                remainingResponses--;
+                if (remainingResponses <= 0)
+                    future.complete(null);
+            }
+        }
+    }
+
+    /** 
+     * Returns a new KafkaFuture that is already completed with the given value.
+     */
+    public static <U> KafkaFuture<U> completedFuture(U value) {
+        KafkaFuture<U> future = new KafkaFutureImpl<U>();
+        future.complete(value);
+        return future;
+    }
+
+    /** 
+     * Returns a new KafkaFuture that is completed when all the given futures have completed.  If
+     * any future throws an exception, the returned future returns it.  If multiple futures throw
+     * an exception, which one gets returned is arbitrarily chosen.
+     */
+    public static KafkaFuture<Void> allOf(KafkaFuture<?>... futures) {
+        KafkaFuture<Void> allOfFuture = new KafkaFutureImpl<Void>();
+        AllOfAdapter allOfWaiter = new AllOfAdapter(futures.length, allOfFuture);
+        for (KafkaFuture<?> future : futures) {
+            future.addWaiter(allOfWaiter);
+        }
+        return allOfFuture;
+    }
+
+    /**
+     * Returns a new KafkaFuture that, when this future completes normally, is executed with this
+     * futures's result as the argument to the supplied function.
+     */
+    public abstract <R> KafkaFuture<R> thenApply(Function<T, R> function);
+
+    protected abstract void addWaiter(BiConsumer<? super T, ? super Throwable> action);
+
+    /**
+     * If not already completed, sets the value returned by get() and related methods to the given
+     * value.
+     */
+    protected abstract boolean complete(T newValue);
+
+    /**
+     * If not already completed, causes invocations of get() and related methods to throw the given
+     * exception.
+     */
+    protected abstract boolean completeExceptionally(Throwable newException);
+
+    /**
+     * If not already completed, completes this future with a CancellationException.  Dependent
+     * futures that have not already completed will also complete exceptionally, with a
+     * CompletionException caused by this CancellationException.
+     */
+    @Override
+    public abstract boolean cancel(boolean mayInterruptIfRunning);
+
+    /**
+     * Waits if necessary for this future to complete, and then returns its result.
+     */
+    @Override
+    public abstract T get() throws InterruptedException, ExecutionException;
+
+    /**
+     * Waits if necessary for at most the given time for this future to complete, and then returns
+     * its result, if available.
+     */
+    @Override
+    public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+        TimeoutException;
+
+    /**
+     * Returns the result value (or throws any encountered exception) if completed, else returns
+     * the given valueIfAbsent.
+     */
+    public abstract T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException;
+
+    /**
+     * Returns true if this CompletableFuture was cancelled before it completed normally.
+     */
+    @Override
+    public abstract boolean isCancelled();
+
+    /**
+     * Returns true if this CompletableFuture completed exceptionally, in any way.
+     */
+    public abstract boolean isCompletedExceptionally();
+
+    /**
+     * Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
+     */
+    @Override
+    public abstract boolean isDone();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
new file mode 100644
index 0000000..01355c6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * A flexible future which supports call chaining and other asynchronous programming patterns.
+ * This will eventually become a thin shim on top of Java 8's CompletableFuture.
+ */
+public class KafkaFutureImpl<T> extends KafkaFuture<T> {
+    /**
+     * A convenience method that throws the current exception, wrapping it if needed.
+     *
+     * In general, KafkaFuture throws CancellationException and InterruptedException directly, and
+     * wraps all other exceptions in an ExecutionException.
+     */
+    private static void wrapAndThrow(Throwable t) throws InterruptedException, ExecutionException {
+        if (t instanceof CancellationException) {
+            throw (CancellationException) t;
+        } else if (t instanceof InterruptedException) {
+            throw (InterruptedException) t;
+        } else {
+            throw new ExecutionException(t);
+        }
+    }
+
+    private static class Applicant<A, B> extends BiConsumer<A, Throwable> {
+        private final Function<A, B> function;
+        private final KafkaFutureImpl<B> future;
+
+        Applicant(Function<A, B> function, KafkaFutureImpl<B> future) {
+            this.function = function;
+            this.future = future;
+        }
+
+        @Override
+        public void accept(A a, Throwable exception) {
+            if (exception != null) {
+                future.completeExceptionally(exception);
+            } else {
+                try {
+                    B b = function.apply(a);
+                    future.complete(b);
+                } catch (Throwable t) {
+                    future.completeExceptionally(t);
+                }
+            }
+        }
+    }
+
+    private static class SingleWaiter<R> extends BiConsumer<R, Throwable> {
+        private R value = null;
+        private Throwable exception = null;
+        private boolean done = false;
+
+        @Override
+        public synchronized void accept(R newValue, Throwable newException) {
+            this.value = newValue;
+            this.exception = newException;
+            this.done = true;
+            this.notifyAll();
+        }
+
+        synchronized R await() throws InterruptedException, ExecutionException {
+            while (true) {
+                if (exception != null)
+                    wrapAndThrow(exception);
+                if (done)
+                    return value;
+                this.wait();
+            }
+        }
+
+        R await(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, TimeoutException {
+            long startMs = System.currentTimeMillis();
+            long waitTimeMs = (unit.toMillis(timeout) > 0) ? unit.toMillis(timeout) : 1;
+            long delta = 0;
+            synchronized (this) {
+                while (true) {
+                    if (exception != null)
+                        wrapAndThrow(exception);
+                    if (done)
+                        return value;
+                    if (delta > waitTimeMs) {
+                        throw new TimeoutException();
+                    }
+                    this.wait(waitTimeMs - delta);
+                    delta = System.currentTimeMillis() - startMs;
+                }
+            }
+        }
+    }
+
+    /**
+     * True if this future is done.
+     */
+    private boolean done = false;
+
+    /**
+     * The value of this future, or null.  Protected by the object monitor.
+     */
+    private T value = null;
+
+    /**
+     * The exception associated with this future, or null.  Protected by the object monitor.
+     */
+    private Throwable exception = null;
+
+    /**
+     * A list of objects waiting for this future to complete (either successfully or
+     * exceptionally).  Protected by the object monitor.
+     */
+    private List<BiConsumer<? super T, ? super Throwable>> waiters = new ArrayList<>();
+
+    /**
+     * Returns a new KafkaFuture that, when this future completes normally, is executed with this
+     * futures's result as the argument to the supplied function.
+     */
+    @Override
+    public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
+        KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
+        addWaiter(new Applicant(function, future));
+        return future;
+    }
+
+    @Override
+    protected synchronized void addWaiter(BiConsumer<? super T, ? super Throwable> action) {
+        if (exception != null) {
+            action.accept(null, exception);
+        } else if (done) {
+            action.accept(value, null);
+        } else {
+            waiters.add(action);
+        }
+    }
+
+    @Override
+    public synchronized boolean complete(T newValue) {
+        List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
+        synchronized (this) {
+            if (done)
+                return false;
+            value = newValue;
+            done = true;
+            oldWaiters = waiters;
+            waiters = null;
+        }
+        for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
+            waiter.accept(newValue, null);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean completeExceptionally(Throwable newException) {
+        List<BiConsumer<? super T, ? super Throwable>> oldWaiters = null;
+        synchronized (this) {
+            if (done)
+                return false;
+            exception = newException;
+            done = true;
+            oldWaiters = waiters;
+            waiters = null;
+        }
+        for (BiConsumer<? super T, ? super Throwable> waiter : oldWaiters) {
+            waiter.accept(null, newException);
+        }
+        return true;
+    }
+
+    /**
+     * If not already completed, completes this future with a CancellationException.  Dependent
+     * futures that have not already completed will also complete exceptionally, with a
+     * CompletionException caused by this CancellationException.
+     */
+    @Override
+    public synchronized boolean cancel(boolean mayInterruptIfRunning) {
+        if (completeExceptionally(new CancellationException()))
+            return true;
+        return exception instanceof CancellationException;
+    }
+
+    /**
+     * Waits if necessary for this future to complete, and then returns its result.
+     */
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+        SingleWaiter<T> waiter = new SingleWaiter<T>();
+        addWaiter(waiter);
+        return waiter.await();
+    }
+
+    /**
+     * Waits if necessary for at most the given time for this future to complete, and then returns
+     * its result, if available.
+     */
+    @Override
+    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
+            TimeoutException {
+        SingleWaiter<T> waiter = new SingleWaiter<T>();
+        addWaiter(waiter);
+        return waiter.await(timeout, unit);
+    }
+
+    /**
+     * Returns the result value (or throws any encountered exception) if completed, else returns
+     * the given valueIfAbsent.
+     */
+    @Override
+    public synchronized T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException {
+        if (exception != null)
+            wrapAndThrow(exception);
+        if (done)
+            return value;
+        return valueIfAbsent;
+    }
+
+    /**
+     * Returns true if this CompletableFuture was cancelled before it completed normally.
+     */
+    @Override
+    public synchronized boolean isCancelled() {
+        return (exception != null) && (exception instanceof CancellationException);
+    }
+
+    /**
+     * Returns true if this CompletableFuture completed exceptionally, in any way.
+     */
+    @Override
+    public synchronized boolean isCompletedExceptionally() {
+        return exception != null;
+    }
+
+    /**
+     * Returns true if completed in any fashion: normally, exceptionally, or via cancellation.
+     */
+    @Override
+    public synchronized boolean isDone() {
+        return done;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
index 303d76f..0e5ca78 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.KafkaException;
 /**
  * A ChannelBuilder interface to build Channel based on configs
  */
-public interface ChannelBuilder {
+public interface ChannelBuilder extends AutoCloseable {
 
     /**
      * Configure this class with the given key-value pairs

http://git-wip-us.apache.org/repos/asf/kafka/blob/4aed28d1/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8dd3ad6..312e1f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -79,7 +79,7 @@ import org.slf4j.LoggerFactory;
  *
  * This class is not thread safe!
  */
-public class Selector implements Selectable {
+public class Selector implements Selectable, AutoCloseable {
 
     public static final long NO_IDLE_TIMEOUT_MS = -1;
     private static final Logger log = LoggerFactory.getLogger(Selector.class);