You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/09/02 20:55:25 UTC

[3/3] kafka git commit: KAFKA-2411; remove usage of blocking channel

KAFKA-2411; remove usage of blocking channel

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>, Gwen Shapira <cs...@gmail.com>

Closes #151 from ijuma/kafka-2411-remove-usage-of-blocking-channel


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d02ca36c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d02ca36c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d02ca36c

Branch: refs/heads/trunk
Commit: d02ca36ca1cccdb6962191b97f54ce96b9d75abc
Parents: d0adf6a
Author: Ismael Juma <is...@juma.me.uk>
Authored: Wed Sep 2 11:55:08 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Sep 2 11:55:08 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientUtils.java   |  23 +-
 .../kafka/clients/ClusterConnectionStates.java  |  13 +-
 .../org/apache/kafka/clients/KafkaClient.java   |  18 +-
 .../kafka/clients/ManualMetadataUpdater.java    |  76 +++++
 .../apache/kafka/clients/MetadataUpdater.java   |  72 ++++
 .../org/apache/kafka/clients/NetworkClient.java | 318 ++++++++++++------
 .../errors/BrokerNotAvailableException.java     |  32 ++
 .../common/errors/ControllerMovedException.java |  32 ++
 .../kafka/common/network/ChannelBuilders.java   |  52 +++
 .../kafka/common/network/NetworkReceive.java    |  10 -
 .../apache/kafka/common/network/Selectable.java |  20 +-
 .../apache/kafka/common/network/Selector.java   |   8 +-
 .../apache/kafka/common/protocol/ApiKeys.java   |   4 +-
 .../apache/kafka/common/protocol/Errors.java    |   6 +-
 .../kafka/common/protocol/ProtoUtils.java       |   6 +
 .../apache/kafka/common/protocol/Protocol.java  | 111 ++++++-
 .../kafka/common/requests/AbstractRequest.java  |   6 +
 .../requests/ControlledShutdownRequest.java     |  69 ++++
 .../requests/ControlledShutdownResponse.java    |  91 ++++++
 .../common/requests/LeaderAndIsrRequest.java    | 212 ++++++++++++
 .../common/requests/LeaderAndIsrResponse.java   | 105 ++++++
 .../common/requests/UpdateMetadataRequest.java  | 291 +++++++++++++++++
 .../common/requests/UpdateMetadataResponse.java |  59 ++++
 .../org/apache/kafka/clients/MockClient.java    |  10 +
 .../apache/kafka/clients/NetworkClientTest.java |  23 +-
 .../common/requests/RequestResponseTest.java    | 119 ++++++-
 .../org/apache/kafka/test/MockSelector.java     |   4 +
 .../kafka/api/ControlledShutdownRequest.scala   |  19 +-
 .../controller/ControllerChannelManager.scala   | 327 ++++++++++++-------
 .../kafka/controller/KafkaController.scala      |  46 ++-
 .../controller/PartitionStateMachine.scala      |   4 +-
 .../kafka/controller/ReplicaStateMachine.scala  |   2 +-
 .../kafka/controller/TopicDeletionManager.scala |  20 +-
 .../scala/kafka/network/BlockingChannel.scala   |  10 +-
 .../main/scala/kafka/network/SocketServer.scala |  13 +-
 .../main/scala/kafka/server/KafkaServer.scala   | 184 +++++++++--
 .../kafka/utils/NetworkClientBlockingOps.scala  | 142 ++++++++
 .../controller/ControllerFailoverTest.scala     |  18 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  38 ++-
 39 files changed, 2229 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index ba3bcbe..e7514f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -19,10 +19,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.network.ChannelBuilder;
-import org.apache.kafka.common.network.SSLChannelBuilder;
-import org.apache.kafka.common.network.PlaintextChannelBuilder;
 import org.apache.kafka.common.security.ssl.SSLFactory;
 import org.apache.kafka.common.config.ConfigException;
 import org.slf4j.Logger;
@@ -71,25 +70,13 @@ public class ClientUtils {
 
     /**
      * @param configs client/server configs
-     * returns ChannelBuilder configured channelBuilder based on the configs.
+     * @return configured ChannelBuilder based on the configs.
      */
     public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
         SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
-        ChannelBuilder channelBuilder = null;
-
-        switch (securityProtocol) {
-            case SSL:
-                channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT);
-                break;
-            case PLAINTEXT:
-                channelBuilder = new PlaintextChannelBuilder();
-                break;
-            default:
-                throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
-        }
-
-        channelBuilder.configure(configs);
-        return channelBuilder;
+        if (securityProtocol != SecurityProtocol.SSL && securityProtocol != SecurityProtocol.PLAINTEXT)
+            throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+        return ChannelBuilders.create(securityProtocol, SSLFactory.Mode.CLIENT, configs);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 9ebda5e..6c58211 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -120,6 +120,17 @@ final class ClusterConnectionStates {
         NodeConnectionState nodeState = nodeState(id);
         nodeState.state = ConnectionState.DISCONNECTED;
     }
+
+    /**
+     * Remove the given node from the tracked connection states. The main difference between this and `disconnected`
+     * is the impact on `connectionDelay`: it will be 0 after this call whereas `reconnectBackoffMs` will be taken
+     * into account after `disconnected` is called.
+     *
+     * @param id The connection to remove
+     */
+    public void remove(String id) {
+        nodeState.remove(id);
+    }
     
     /**
      * Get the state of a given connection
@@ -158,4 +169,4 @@ final class ClusterConnectionStates {
             return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 7ab2503..f46c0d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -81,6 +81,13 @@ public interface KafkaClient extends Closeable {
     public List<ClientResponse> poll(long timeout, long now);
 
     /**
+     * Closes the connection to a particular node (if there is one).
+     *
+     * @param nodeId The id of the node
+     */
+    public void close(String nodeId);
+
+    /**
      * Complete all in-flight requests for a given connection
      * 
      * @param id The connection to complete requests for
@@ -127,8 +134,17 @@ public interface KafkaClient extends Closeable {
     public RequestHeader nextRequestHeader(ApiKeys key);
 
     /**
+     * Generate a request header for the given API key
+     *
+     * @param key The api key
+     * @param version The api version
+     * @return A request header with the appropriate client id and correlation id
+     */
+    public RequestHeader nextRequestHeader(ApiKeys key, short version);
+
+    /**
      * Wake up the client if it is currently blocked waiting for I/O
      */
     public void wakeup();
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
new file mode 100644
index 0000000..efbe664
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A simple implementation of `MetadataUpdater` that returns the cluster nodes set via the constructor or via
+ * `setNodes`.
+ *
+ * This is useful in cases where automatic metadata updates are not required. An example is controller/broker
+ * communication.
+ *
+ * This class is not thread-safe!
+ */
+public class ManualMetadataUpdater implements MetadataUpdater {
+
+    private List<Node> nodes;
+
+    public ManualMetadataUpdater() {
+        this(new ArrayList<Node>(0));
+    }
+
+    public ManualMetadataUpdater(List<Node> nodes) {
+        this.nodes = nodes;
+    }
+
+    public void setNodes(List<Node> nodes) {
+        this.nodes = nodes;
+    }
+
+    @Override
+    public List<Node> fetchNodes() {
+        return new ArrayList<>(nodes);
+    }
+
+    @Override
+    public boolean isUpdateDue(long now) {
+        return false;
+    }
+
+    @Override
+    public long maybeUpdate(long now) {
+        return Long.MAX_VALUE;
+    }
+
+    @Override
+    public boolean maybeHandleDisconnection(ClientRequest request) {
+        return false;
+    }
+
+    @Override
+    public boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body) {
+        return false;
+    }
+
+    @Override
+    public void requestUpdate() {
+        // Do nothing
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
new file mode 100644
index 0000000..4669a68
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.util.List;
+
+/**
+ * The interface used by `NetworkClient` to request cluster metadata info to be updated and to retrieve the cluster nodes
+ * from such metadata. This is an internal class.
+ * <p>
+ * This class is not thread-safe!
+ */
+interface MetadataUpdater {
+
+    /**
+     * Gets the current cluster info without blocking.
+     */
+    List<Node> fetchNodes();
+
+    /**
+     * Returns true if an update to the cluster metadata info is due.
+     */
+    boolean isUpdateDue(long now);
+
+    /**
+     * Starts a cluster metadata update if needed and possible. Returns the time until the metadata update (which would
+     * be 0 if an update has been started as a result of this call).
+     *
+     * If the implementation relies on `NetworkClient` to send requests, the completed receive will be passed to
+     * `maybeHandleCompletedReceive`.
+     *
+     * The semantics of `needed` and `possible` are implementation-dependent and may take into account a number of
+     * factors like node availability, how long since the last metadata update, etc.
+     */
+    long maybeUpdate(long now);
+
+    /**
+     * If `request` is a metadata request, handles it and return `true`. Otherwise, returns `false`.
+     *
+     * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
+     * requests with special handling for disconnections of such requests.
+     */
+    boolean maybeHandleDisconnection(ClientRequest request);
+
+    /**
+     * If `request` is a metadata request, handles it and returns `true`. Otherwise, returns `false`.
+     *
+     * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
+     * requests with special handling for completed receives of such requests.
+     */
+    boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body);
+
+    /**
+     * Schedules an update of the current cluster metadata info. A subsequent call to `maybeUpdate` would trigger the
+     * start of the update if possible (see `maybeUpdate` for more information).
+     */
+    void requestUpdate();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index b31f7f1..0a6f952 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -48,9 +48,8 @@ public class NetworkClient implements KafkaClient {
 
     /* the selector used to perform network i/o */
     private final Selectable selector;
-
-    /* the current cluster metadata */
-    private final Metadata metadata;
+    
+    private final MetadataUpdater metadataUpdater;
 
     /* the state of each node's connection */
     private final ClusterConnectionStates connectionStates;
@@ -73,12 +72,6 @@ public class NetworkClient implements KafkaClient {
     /* the current correlation id to use when sending requests to servers */
     private int correlation;
 
-    /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
-    private boolean metadataFetchInProgress;
-
-    /* the last timestamp when no broker node is available to connect */
-    private long lastNoNodeAvailableMs;
-
     public NetworkClient(Selectable selector,
                          Metadata metadata,
                          String clientId,
@@ -86,8 +79,43 @@ public class NetworkClient implements KafkaClient {
                          long reconnectBackoffMs,
                          int socketSendBuffer,
                          int socketReceiveBuffer) {
+        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
+                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer);
+    }
+
+    public NetworkClient(Selectable selector,
+                         MetadataUpdater metadataUpdater,
+                         String clientId,
+                         int maxInFlightRequestsPerConnection,
+                         long reconnectBackoffMs,
+                         int socketSendBuffer,
+                         int socketReceiveBuffer) {
+        this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
+                socketSendBuffer, socketReceiveBuffer);
+    }
+
+    private NetworkClient(MetadataUpdater metadataUpdater,
+                          Metadata metadata,
+                          Selectable selector,
+                          String clientId,
+                          int maxInFlightRequestsPerConnection,
+                          long reconnectBackoffMs,
+                          int socketSendBuffer,
+                          int socketReceiveBuffer) {
+
+        /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
+         * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
+         * super constructor is invoked.
+         */
+        if (metadataUpdater == null) {
+            if (metadata == null)
+                throw new IllegalArgumentException("`metadata` must not be null");
+            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
+        } else {
+            this.metadataUpdater = metadataUpdater;
+        }
+
         this.selector = selector;
-        this.metadata = metadata;
         this.clientId = clientId;
         this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
         this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
@@ -95,8 +123,6 @@ public class NetworkClient implements KafkaClient {
         this.socketReceiveBuffer = socketReceiveBuffer;
         this.correlation = 0;
         this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
-        this.metadataFetchInProgress = false;
-        this.lastNoNodeAvailableMs = 0;
     }
 
     /**
@@ -119,6 +145,17 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
+     * Closes the connection to a particular node (if there is one).
+     *
+     * @param nodeId The id of the node
+     */
+    @Override
+    public void close(String nodeId) {
+        selector.close(nodeId);
+        connectionStates.remove(nodeId);
+    }
+
+    /**
      * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
      * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
      * connections.
@@ -154,14 +191,9 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public boolean isReady(Node node, long now) {
-        String nodeId = node.idString();
-        if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0)
-            // if we need to update our metadata now declare all requests unready to make metadata requests first
-            // priority
-            return false;
-        else
-            // otherwise we are ready if we are connected and can send more requests
-            return canSendRequest(nodeId);
+        // if we need to update our metadata now declare all requests unready to make metadata requests first
+        // priority
+        return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
     }
 
     /**
@@ -193,7 +225,10 @@ public class NetworkClient implements KafkaClient {
         String nodeId = request.request().destination();
         if (!canSendRequest(nodeId))
             throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
+        doSend(request);
+    }
 
+    private void doSend(ClientRequest request) {
         this.inFlightRequests.add(request);
         selector.send(request.request());
     }
@@ -207,16 +242,7 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public List<ClientResponse> poll(long timeout, long now) {
-        // should we update our metadata?
-        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
-        long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
-        long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
-        // if there is no node available to connect, back off refreshing metadata
-        long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
-                                        waitForMetadataFetch);
-        if (metadataTimeout == 0)
-            maybeUpdateMetadata(now);
-        // do the I/O
+        long metadataTimeout = metadataUpdater.maybeUpdate(now);
         try {
             this.selector.poll(Math.min(timeout, metadataTimeout));
         } catch (IOException e) {
@@ -224,7 +250,7 @@ public class NetworkClient implements KafkaClient {
         }
 
         // process completed actions
-        List<ClientResponse> responses = new ArrayList<ClientResponse>();
+        List<ClientResponse> responses = new ArrayList<>();
         handleCompletedSends(responses, now);
         handleCompletedReceives(responses, now);
         handleDisconnections(responses, now);
@@ -304,6 +330,18 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
+     * Generate a request header for the given API key and version
+     *
+     * @param key The api key
+     * @param version The api version
+     * @return A request header with the appropriate client id and correlation id
+     */
+    @Override
+    public RequestHeader nextRequestHeader(ApiKeys key, short version) {
+        return new RequestHeader(key.id, version, clientId, correlation++);
+    }
+
+    /**
      * Interrupt the client if it is blocked waiting on I/O.
      */
     @Override
@@ -327,8 +365,9 @@ public class NetworkClient implements KafkaClient {
      *
      * @return The node with the fewest in-flight requests.
      */
+    @Override
     public Node leastLoadedNode(long now) {
-        List<Node> nodes = this.metadata.fetch().nodes();
+        List<Node> nodes = this.metadataUpdater.fetchNodes();
         int inflight = Integer.MAX_VALUE;
         Node found = null;
         for (int i = 0; i < nodes.size(); i++) {
@@ -378,30 +417,8 @@ public class NetworkClient implements KafkaClient {
             short apiKey = req.request().header().apiKey();
             Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
             correlate(req.request().header(), header);
-            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
-                handleMetadataResponse(req.request().header(), body, now);
-            } else {
-                // need to add body/header to response here
+            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                 responses.add(new ClientResponse(req, now, false, body));
-            }
-        }
-    }
-
-    private void handleMetadataResponse(RequestHeader header, Struct body, long now) {
-        this.metadataFetchInProgress = false;
-        MetadataResponse response = new MetadataResponse(body);
-        Cluster cluster = response.cluster();
-        // check if any topics metadata failed to get updated
-        if (response.errors().size() > 0) {
-            log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
-        }
-        // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
-        // created which means we will get errors and no nodes until it exists
-        if (cluster.nodes().size() > 0) {
-            this.metadata.update(cluster, now);
-        } else {
-            log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
-            this.metadata.failedUpdate(now);
         }
     }
 
@@ -417,16 +434,13 @@ public class NetworkClient implements KafkaClient {
             log.debug("Node {} disconnected.", node);
             for (ClientRequest request : this.inFlightRequests.clearAll(node)) {
                 log.trace("Cancelled request {} due to node {} being disconnected", request, node);
-                ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
-                if (requestKey == ApiKeys.METADATA)
-                    metadataFetchInProgress = false;
-                else
+                if (!metadataUpdater.maybeHandleDisconnection(request))
                     responses.add(new ClientResponse(request, now, true, null));
             }
         }
         // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
         if (this.selector.disconnected().size() > 0)
-            this.metadata.requestUpdate();
+            metadataUpdater.requestUpdate();
     }
 
     /**
@@ -449,52 +463,6 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
-     * Create a metadata request for the given topics
-     */
-    private ClientRequest metadataRequest(long now, String node, Set<String> topics) {
-        MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
-        RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
-        return new ClientRequest(now, true, send, null, true);
-    }
-
-    /**
-     * Add a metadata request to the list of sends if we can make one
-     */
-    private void maybeUpdateMetadata(long now) {
-        // Beware that the behavior of this method and the computation of timeouts for poll() are
-        // highly dependent on the behavior of leastLoadedNode.
-        Node node = this.leastLoadedNode(now);
-        if (node == null) {
-            log.debug("Give up sending metadata request since no node is available");
-            // mark the timestamp for no node available to connect
-            this.lastNoNodeAvailableMs = now;
-            return;
-        }
-        String nodeConnectionId = node.idString();
-
-        if (canSendRequest(nodeConnectionId)) {
-            Set<String> topics = metadata.topics();
-            this.metadataFetchInProgress = true;
-            ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics);
-            log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
-            this.selector.send(metadataRequest.request());
-            this.inFlightRequests.add(metadataRequest);
-        } else if (connectionStates.canConnect(nodeConnectionId, now)) {
-            // we don't have a connection to this node right now, make one
-            log.debug("Initialize connection to node {} for sending metadata request", node.id());
-            initiateConnect(node, now);
-            // If initiateConnect failed immediately, this node will be put into blackout and we
-            // should allow immediately retrying in case there is another candidate node. If it
-            // is still connecting, the worst case is that we end up setting a longer timeout
-            // on the next round and then wait for the response.
-        } else { // connected, but can't send more OR connecting
-            // In either case, we just need to wait for a network event to let us know the selected
-            // connection might be usable again.
-            this.lastNoNodeAvailableMs = now;
-        }
-    }
-
-    /**
      * Initiate a connection to the given node
      */
     private void initiateConnect(Node node, long now) {
@@ -510,9 +478,145 @@ public class NetworkClient implements KafkaClient {
             /* attempt failed, we'll try again after the backoff */
             connectionStates.disconnected(nodeConnectionId);
             /* maybe the problem is our metadata, update it */
-            metadata.requestUpdate();
+            metadataUpdater.requestUpdate();
             log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
         }
     }
 
+    class DefaultMetadataUpdater implements MetadataUpdater {
+
+        /* the current cluster metadata */
+        private final Metadata metadata;
+
+        /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
+        private boolean metadataFetchInProgress;
+
+        /* the last timestamp when no broker node is available to connect */
+        private long lastNoNodeAvailableMs;
+
+        DefaultMetadataUpdater(Metadata metadata) {
+            this.metadata = metadata;
+            this.metadataFetchInProgress = false;
+            this.lastNoNodeAvailableMs = 0;
+        }
+
+        @Override
+        public List<Node> fetchNodes() {
+            return metadata.fetch().nodes();
+        }
+
+        @Override
+        public boolean isUpdateDue(long now) {
+            return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
+        }
+
+        @Override
+        public long maybeUpdate(long now) {
+            // should we update our metadata?
+            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
+            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
+            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
+            // if there is no node available to connect, back off refreshing metadata
+            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
+                    waitForMetadataFetch);
+
+            if (metadataTimeout == 0) {
+                // Beware that the behavior of this method and the computation of timeouts for poll() are
+                // highly dependent on the behavior of leastLoadedNode.
+                Node node = leastLoadedNode(now);
+                maybeUpdate(now, node);
+            }
+
+            return metadataTimeout;
+        }
+
+        @Override
+        public boolean maybeHandleDisconnection(ClientRequest request) {
+            ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
+
+            if (requestKey == ApiKeys.METADATA) {
+                metadataFetchInProgress = false;
+                return true;
+            }
+
+            return false;
+        }
+
+        @Override
+        public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
+            short apiKey = req.request().header().apiKey();
+            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
+                handleResponse(req.request().header(), body, now);
+                return true;
+            }
+            return false;
+        }
+
+        @Override
+        public void requestUpdate() {
+            this.metadata.requestUpdate();
+        }
+
+        private void handleResponse(RequestHeader header, Struct body, long now) {
+            this.metadataFetchInProgress = false;
+            MetadataResponse response = new MetadataResponse(body);
+            Cluster cluster = response.cluster();
+            // check if any topics metadata failed to get updated
+            if (response.errors().size() > 0) {
+                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
+            }
+            // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
+            // created which means we will get errors and no nodes until it exists
+            if (cluster.nodes().size() > 0) {
+                this.metadata.update(cluster, now);
+            } else {
+                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
+                this.metadata.failedUpdate(now);
+            }
+        }
+
+        /**
+         * Create a metadata request for the given topics
+         */
+        private ClientRequest request(long now, String node, Set<String> topics) {
+            MetadataRequest metadata = new MetadataRequest(new ArrayList<>(topics));
+            RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
+            return new ClientRequest(now, true, send, null, true);
+        }
+
+        /**
+         * Add a metadata request to the list of sends if we can make one
+         */
+        private void maybeUpdate(long now, Node node) {
+            if (node == null) {
+                log.debug("Give up sending metadata request since no node is available");
+                // mark the timestamp for no node available to connect
+                this.lastNoNodeAvailableMs = now;
+                return;
+            }
+            String nodeConnectionId = node.idString();
+
+            if (canSendRequest(nodeConnectionId)) {
+                Set<String> topics = metadata.topics();
+                this.metadataFetchInProgress = true;
+                ClientRequest metadataRequest = request(now, nodeConnectionId, topics);
+                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
+                doSend(metadataRequest);
+            } else if (connectionStates.canConnect(nodeConnectionId, now)) {
+                // we don't have a connection to this node right now, make one
+                log.debug("Initialize connection to node {} for sending metadata request", node.id());
+                initiateConnect(node, now);
+                // If initiateConnect failed immediately, this node will be put into blackout and we
+                // should allow immediately retrying in case there is another candidate node. If it
+                // is still connecting, the worst case is that we end up setting a longer timeout
+                // on the next round and then wait for the response.
+            } else { // connected, but can't send more OR connecting
+                // In either case, we just need to wait for a network event to let us know the selected
+                // connection might be usable again.
+                this.lastNoNodeAvailableMs = now;
+            }
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java
new file mode 100644
index 0000000..f78f061
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class BrokerNotAvailableException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public BrokerNotAvailableException(String message) {
+        super(message);
+    }
+
+    public BrokerNotAvailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java b/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java
new file mode 100644
index 0000000..8dd7487
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class ControllerMovedException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ControllerMovedException(String message) {
+        super(message);
+    }
+
+    public ControllerMovedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
new file mode 100644
index 0000000..2332d3f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.ssl.SSLFactory;
+
+import java.util.Map;
+
+public class ChannelBuilders {
+
+    private ChannelBuilders() { }
+
+    /**
+     * @param securityProtocol the securityProtocol
+     * @param mode the SSL mode, it must be non-null if `securityProcol` is `SSL` and it is ignored otherwise
+     * @param configs client/server configs
+     * @return the configured `ChannelBuilder`
+     * @throws IllegalArgumentException if `mode` invariants described above is not maintained
+     */
+    public static ChannelBuilder create(SecurityProtocol securityProtocol, SSLFactory.Mode mode, Map<String, ?> configs) {
+        ChannelBuilder channelBuilder = null;
+
+        switch (securityProtocol) {
+            case SSL:
+                if (mode == null)
+                    throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `SSL`");
+                channelBuilder = new SSLChannelBuilder(mode);
+                break;
+            case PLAINTEXT:
+            case TRACE:
+                channelBuilder = new PlaintextChannelBuilder();
+                break;
+            default:
+                throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
+        }
+
+        channelBuilder.configure(configs);
+        return channelBuilder;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 2a1568e..409775c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -107,14 +107,4 @@ public class NetworkReceive implements Receive {
         return this.buffer;
     }
 
-    // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel
-    @Deprecated
-    public long readCompletely(ReadableByteChannel channel) throws IOException {
-        int totalRead = 0;
-        while (!complete()) {
-            totalRead += readFromReadableChannel(channel);
-        }
-        return totalRead;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index 39eae4a..70e74bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -23,6 +23,11 @@ import java.util.List;
 public interface Selectable {
 
     /**
+     * See {@link #connect(String, InetSocketAddress, int, int) connect()}
+     */
+    public static final int USE_DEFAULT_BUFFER_SIZE = -1;
+
+    /**
      * Begin establishing a socket connection to the given address identified by the given address
      * @param id The id for this connection
      * @param address The address to connect to
@@ -48,7 +53,12 @@ public interface Selectable {
     public void close();
 
     /**
-     * Queue the given request for sending in the subsequent {@poll(long)} calls
+     * Close the connection identified by the given id
+     */
+    public void close(String nodeId);
+
+    /**
+     * Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls
      * @param send The request to send
      */
     public void send(Send send);
@@ -61,23 +71,23 @@ public interface Selectable {
     public void poll(long timeout) throws IOException;
 
     /**
-     * The list of sends that completed on the last {@link #poll(long, List) poll()} call.
+     * The list of sends that completed on the last {@link #poll(long) poll()} call.
      */
     public List<Send> completedSends();
 
     /**
-     * The list of receives that completed on the last {@link #poll(long, List) poll()} call.
+     * The list of receives that completed on the last {@link #poll(long) poll()} call.
      */
     public List<NetworkReceive> completedReceives();
 
     /**
-     * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()}
+     * The list of connections that finished disconnecting on the last {@link #poll(long) poll()}
      * call.
      */
     public List<String> disconnected();
 
     /**
-     * The list of connections that completed their connection on the last {@link #poll(long, List) poll()}
+     * The list of connections that completed their connection on the last {@link #poll(long) poll()}
      * call.
      */
     public List<String> connected();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index f49d54c..4aa5cbb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -146,8 +146,10 @@ public class Selector implements Selectable {
         socketChannel.configureBlocking(false);
         Socket socket = socketChannel.socket();
         socket.setKeepAlive(true);
-        socket.setSendBufferSize(sendBufferSize);
-        socket.setReceiveBufferSize(receiveBufferSize);
+        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
+            socket.setSendBufferSize(sendBufferSize);
+        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
+            socket.setReceiveBufferSize(receiveBufferSize);
         socket.setTcpNoDelay(true);
         try {
             socketChannel.connect(address);
@@ -182,7 +184,7 @@ public class Selector implements Selectable {
      */
     @Override
     public void disconnect(String id) {
-        KafkaChannel channel = channelForId(id);
+        KafkaChannel channel = this.channels.get(id);
         if (channel != null)
             channel.disconnect();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index b39e9bb..46ddddb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -49,7 +49,7 @@ public enum ApiKeys {
         MAX_API_KEY = maxKey;
     }
 
-    /** the perminant and immutable id of an API--this can't change ever */
+    /** the permanent and immutable id of an API--this can't change ever */
     public final short id;
 
     /** an english description of the api--this is for debugging and can change */
@@ -63,4 +63,4 @@ public enum ApiKeys {
     public static ApiKeys forId(int id) {
         return codeToType[id];
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index e17e390..641afa1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.kafka.common.errors.*;
+import org.apache.kafka.common.errors.ControllerMovedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,11 +46,14 @@ public enum Errors {
             new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
     REQUEST_TIMED_OUT(7,
             new TimeoutException("The request timed out.")),
-    // TODO: errorCode 8 for BrokerNotAvailable
+    BROKER_NOT_AVAILABLE(8,
+            new BrokerNotAvailableException("The broker is not available.")),
     REPLICA_NOT_AVAILABLE(9,
             new ApiException("The replica is not available for the requested topic-partition")),
     MESSAGE_TOO_LARGE(10,
             new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
+    STALE_CONTROLLER_EPOCH(11,
+            new ControllerMovedException("The controller moved to another broker.")),
     OFFSET_METADATA_TOO_LARGE(12,
             new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
     NETWORK_EXCEPTION(13,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
index c2cbbbd..85357ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -29,6 +29,8 @@ public class ProtoUtils {
         Schema[] versions = schemas[apiKey];
         if (version < 0 || version > versions.length)
             throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version);
+        if (versions[version] == null)
+            throw new IllegalArgumentException("Unsupported version for API key " + apiKey + ": " + version);
         return versions[version];
     }
 
@@ -62,4 +64,8 @@ public class ProtoUtils {
         return (Struct) currentResponseSchema(apiKey).read(buffer);
     }
 
+    public static Struct parseResponse(int apiKey, int version, ByteBuffer buffer) {
+        return (Struct) responseSchema(apiKey, version).read(buffer);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index a951e90..b72db4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -41,7 +41,7 @@ public class Protocol {
 
     public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
                                                                           new ArrayOf(STRING),
-                                                                          "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
+                                                                          "An array of topics to fetch metadata for. If no topics are specified fetch metadata for all topics."));
 
     public static final Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
                                                    new Field("host", STRING, "The hostname of the broker."),
@@ -396,6 +396,25 @@ public class Protocol {
     public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
     public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
 
+    /* Controlled shutdown api */
+    public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
+                                                                                     INT32,
+                                                                                     "The id of the broker for which controlled shutdown has been requested."));
+
+    public static final Schema CONTROLLED_SHUTDOWN_PARTITION_V1 = new Schema(new Field("topic", STRING),
+                                                                             new Field("partition",
+                                                                                       INT32,
+                                                                                       "Topic partition id."));
+
+    public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = new Schema(new Field("error_code", INT16),
+                                                                            new Field("partitions_remaining",
+                                                                                      new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V1),
+                                                                                      "The partitions that the broker still leads."));
+
+    /* V0 is not supported as it would require changes to the request header not to include `clientId` */
+    public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = new Schema[] {null, CONTROLLED_SHUTDOWN_REQUEST_V1};
+    public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = new Schema[] {null, CONTROLLED_SHUTDOWN_RESPONSE_V1};
+
     /* Join group api */
     public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
                                                                             STRING,
@@ -442,6 +461,39 @@ public class Protocol {
     public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
     public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
 
+    /* Leader and ISR api */
+    public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 =
+            new Schema(new Field("topic", STRING, "Topic name."),
+                       new Field("partition", INT32, "Topic partition id."),
+                       new Field("controller_epoch", INT32, "The controller epoch."),
+                       new Field("leader", INT32, "The broker id for the leader."),
+                       new Field("leader_epoch", INT32, "The leader epoch."),
+                       new Field("isr", new ArrayOf(INT32), "The in sync replica ids."),
+                       new Field("zk_version", INT32, "The ZK version."),
+                       new Field("replicas", new ArrayOf(INT32), "The replica ids."));
+
+    public static final Schema LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0 =
+            new Schema(new Field("id", INT32, "The broker id."),
+                       new Field("host", STRING, "The hostname of the broker."),
+                       new Field("port", INT32, "The port on which the broker accepts requests."));
+
+    public static final Schema LEADER_AND_ISR_REQUEST_V0 = new Schema(new Field("controller_id", INT32, "The controller id."),
+                                                                      new Field("controller_epoch", INT32, "The controller epoch."),
+                                                                      new Field("partition_states",
+                                                                                new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)),
+                                                                      new Field("live_leaders", new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0)));
+
+    public static final Schema LEADER_AND_ISR_RESPONSE_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+                                                                                 new Field("partition", INT32, "Topic partition id."),
+                                                                                 new Field("error_code", INT16, "Error code."));
+
+    public static final Schema LEADER_AND_ISR_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."),
+                                                                       new Field("partitions",
+                                                                                 new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0)));
+
+    public static final Schema[] LEADER_AND_ISR_REQUEST = new Schema[] {LEADER_AND_ISR_REQUEST_V0};
+    public static final Schema[] LEADER_AND_ISR_RESPONSE = new Schema[] {LEADER_AND_ISR_RESPONSE_V0};
+
     /* Replica api */
     public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."),
                                                                               new Field("partition", INT32, "Topic partition id."));
@@ -465,7 +517,50 @@ public class Protocol {
     public static final Schema[] STOP_REPLICA_REQUEST = new Schema[] {STOP_REPLICA_REQUEST_V0};
     public static final Schema[] STOP_REPLICA_RESPONSE = new Schema[] {STOP_REPLICA_RESPONSE_V0};
 
-    /* an array of all requests and responses with all schema versions */
+    /* Update metadata api */
+
+    public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V0 = LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0;
+
+    public static final Schema UPDATE_METADATA_REQUEST_BROKER_V0 =
+            new Schema(new Field("id", INT32, "The broker id."),
+                       new Field("host", STRING, "The hostname of the broker."),
+                       new Field("port", INT32, "The port on which the broker accepts requests."));
+
+    public static final Schema UPDATE_METADATA_REQUEST_V0 = new Schema(new Field("controller_id", INT32, "The controller id."),
+                                                                       new Field("controller_epoch", INT32, "The controller epoch."),
+                                                                       new Field("partition_states",
+                                                                                 new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V0)),
+                                                                       new Field("live_brokers",
+                                                                                 new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V0)));
+
+    public static final Schema UPDATE_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."));
+
+    public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V1 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V0;
+
+    public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V1 =
+            // for some reason, V1 sends `port` before `host` while V0 sends `host` before `port
+            new Schema(new Field("port", INT32, "The port on which the broker accepts requests."),
+                       new Field("host", STRING, "The hostname of the broker."),
+                       new Field("security_protocol_type", INT16, "The security protocol type."));
+
+    public static final Schema UPDATE_METADATA_REQUEST_BROKER_V1 =
+            new Schema(new Field("id", INT32, "The broker id."),
+                       new Field("end_points", new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V1)));
+
+    public static final Schema UPDATE_METADATA_REQUEST_V1 = new Schema(new Field("controller_id", INT32, "The controller id."),
+                                                                       new Field("controller_epoch", INT32, "The controller epoch."),
+                                                                       new Field("partition_states",
+                                                                                 new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V1)),
+                                                                       new Field("live_brokers",
+                                                                                 new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V1)));
+
+    public static final Schema UPDATE_METADATA_RESPONSE_V1 = UPDATE_METADATA_RESPONSE_V0;
+
+    public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1};
+    public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1};
+
+    /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
+     * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
     public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
 
@@ -477,10 +572,10 @@ public class Protocol {
         REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
         REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
         REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
-        REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+        REQUESTS[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_REQUEST;
         REQUESTS[ApiKeys.STOP_REPLICA.id] = STOP_REPLICA_REQUEST;
-        REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
-        REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
+        REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = UPDATE_METADATA_REQUEST;
+        REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST;
         REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
         REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
         REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
@@ -492,10 +587,10 @@ public class Protocol {
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
         RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE;
         RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
-        RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+        RESPONSES[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_RESPONSE;
         RESPONSES[ApiKeys.STOP_REPLICA.id] = STOP_REPLICA_RESPONSE;
-        RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
-        RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
+        RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = UPDATE_METADATA_RESPONSE;
+        RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
         RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index e316957..a696e80 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -57,6 +57,12 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return HeartbeatRequest.parse(buffer, versionId);
             case STOP_REPLICA:
                 return StopReplicaRequest.parse(buffer, versionId);
+            case CONTROLLED_SHUTDOWN_KEY:
+                return ControlledShutdownRequest.parse(buffer, versionId);
+            case UPDATE_METADATA_KEY:
+                return UpdateMetadataRequest.parse(buffer, versionId);
+            case LEADER_AND_ISR:
+                return LeaderAndIsrRequest.parse(buffer, versionId);
             default:
                 return null;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
new file mode 100644
index 0000000..57f51d8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class ControlledShutdownRequest extends AbstractRequest {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
+
+    private static final String BROKER_ID_KEY_NAME = "broker_id";
+
+    private int brokerId;
+
+    public ControlledShutdownRequest(int brokerId) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(BROKER_ID_KEY_NAME, brokerId);
+        this.brokerId = brokerId;
+    }
+
+    public ControlledShutdownRequest(Struct struct) {
+        super(struct);
+        brokerId = struct.getInt(BROKER_ID_KEY_NAME);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                throw new IllegalArgumentException(String.format("Version 0 is not supported. It is only supported by " +
+                        "the Scala request class for controlled shutdown"));
+            case 1:
+                return new ControlledShutdownResponse(Errors.forException(e).code(), Collections.<TopicPartition>emptySet());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)));
+        }
+    }
+
+    public int brokerId() {
+        return brokerId;
+    }
+
+    public static ControlledShutdownRequest parse(ByteBuffer buffer, int versionId) {
+        return new ControlledShutdownRequest(ProtoUtils.parseRequest(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, versionId, buffer));
+    }
+
+    public static ControlledShutdownRequest parse(ByteBuffer buffer) {
+        return new ControlledShutdownRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
new file mode 100644
index 0000000..15d600d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class ControlledShutdownResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
+
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String PARTITIONS_REMAINING_KEY_NAME = "partitions_remaining";
+
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITION_KEY_NAME = "partition";
+
+    /**
+     * Possible error codes:
+     *
+     * UNKNOWN(-1) (this is because IllegalStateException may be thrown in `KafkaController.shutdownBroker`, it would be good to improve this)
+     * BROKER_NOT_AVAILABLE(8)
+     * STALE_CONTROLLER_EPOCH(11)
+     */
+    private final short errorCode;
+
+    private final Set<TopicPartition> partitionsRemaining;
+
+    public ControlledShutdownResponse(short errorCode, Set<TopicPartition> partitionsRemaining) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+
+        List<Struct> partitionsRemainingList = new ArrayList<>(partitionsRemaining.size());
+        for (TopicPartition topicPartition : partitionsRemaining) {
+            Struct topicPartitionStruct = struct.instance(PARTITIONS_REMAINING_KEY_NAME);
+            topicPartitionStruct.set(TOPIC_KEY_NAME, topicPartition.topic());
+            topicPartitionStruct.set(PARTITION_KEY_NAME, topicPartition.partition());
+        }
+        struct.set(PARTITIONS_REMAINING_KEY_NAME, partitionsRemainingList.toArray());
+
+        this.errorCode = errorCode;
+        this.partitionsRemaining = partitionsRemaining;
+    }
+
+    public ControlledShutdownResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        Set<TopicPartition> partitions = new HashSet<>();
+        for (Object topicPartitionObj : struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) {
+            Struct topicPartition = (Struct) topicPartitionObj;
+            String topic = topicPartition.getString(TOPIC_KEY_NAME);
+            int partition = topicPartition.getInt(PARTITION_KEY_NAME);
+            partitions.add(new TopicPartition(topic, partition));
+        }
+        partitionsRemaining = partitions;
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public Set<TopicPartition> partitionsRemaining() {
+        return partitionsRemaining;
+    }
+
+    public static ControlledShutdownResponse parse(ByteBuffer buffer) {
+        return new ControlledShutdownResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+    public static ControlledShutdownResponse parse(ByteBuffer buffer, int version) {
+        return new ControlledShutdownResponse(ProtoUtils.parseResponse(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, version, buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
new file mode 100644
index 0000000..6b16496
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;;
+import java.util.*;
+
+public class LeaderAndIsrRequest extends AbstractRequest {
+
+    public static class PartitionState {
+        public final int controllerEpoch;
+        public final int leader;
+        public final int leaderEpoch;
+        public final List<Integer> isr;
+        public final int zkVersion;
+        public final Set<Integer> replicas;
+
+        public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) {
+            this.controllerEpoch = controllerEpoch;
+            this.leader = leader;
+            this.leaderEpoch = leaderEpoch;
+            this.isr = isr;
+            this.zkVersion = zkVersion;
+            this.replicas = replicas;
+        }
+
+    }
+
+    public static final class EndPoint {
+        public final int id;
+        public final String host;
+        public final int port;
+
+        public EndPoint(int id, String host, int port) {
+            this.id = id;
+            this.host = host;
+            this.port = port;
+        }
+    }
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id);
+
+    private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
+    private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch";
+    private static final String PARTITION_STATES_KEY_NAME = "partition_states";
+    private static final String LIVE_LEADERS_KEY_NAME = "live_leaders";
+
+    // partition_states key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String LEADER_KEY_NAME = "leader";
+    private static final String LEADER_EPOCH_KEY_NAME = "leader_epoch";
+    private static final String ISR_KEY_NAME = "isr";
+    private static final String ZK_VERSION_KEY_NAME = "zk_version";
+    private static final String REPLICAS_KEY_NAME = "replicas";
+
+    // live_leaders key names
+    private static final String END_POINT_ID_KEY_NAME = "id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+
+    private final int controllerId;
+    private final int controllerEpoch;
+    private final Map<TopicPartition, PartitionState> partitionStates;
+    private final Set<EndPoint> liveLeaders;
+
+    public LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates,
+                               Set<EndPoint> liveLeaders) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
+        struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
+
+        List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size());
+        for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) {
+            Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME);
+            TopicPartition topicPartition = entry.getKey();
+            partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
+            partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition());
+            PartitionState partitionState = entry.getValue();
+            partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch);
+            partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
+            partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch);
+            partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
+            partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion);
+            partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray());
+            partitionStatesData.add(partitionStateData);
+        }
+        struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray());
+
+        List<Struct> leadersData = new ArrayList<>(liveLeaders.size());
+        for (EndPoint leader : liveLeaders) {
+            Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME);
+            leaderData.set(END_POINT_ID_KEY_NAME, leader.id);
+            leaderData.set(HOST_KEY_NAME, leader.host);
+            leaderData.set(PORT_KEY_NAME, leader.port);
+            leadersData.add(leaderData);
+        }
+        struct.set(LIVE_LEADERS_KEY_NAME, leadersData.toArray());
+
+        this.controllerId = controllerId;
+        this.controllerEpoch = controllerEpoch;
+        this.partitionStates = partitionStates;
+        this.liveLeaders = liveLeaders;
+    }
+
+    public LeaderAndIsrRequest(Struct struct) {
+        super(struct);
+
+        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
+        for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) {
+            Struct partitionStateData = (Struct) partitionStateDataObj;
+            String topic = partitionStateData.getString(TOPIC_KEY_NAME);
+            int partition = partitionStateData.getInt(PARTITION_KEY_NAME);
+            int controllerEpoch = partitionStateData.getInt(CONTROLLER_EPOCH_KEY_NAME);
+            int leader = partitionStateData.getInt(LEADER_KEY_NAME);
+            int leaderEpoch = partitionStateData.getInt(LEADER_EPOCH_KEY_NAME);
+
+            Object[] isrArray = partitionStateData.getArray(ISR_KEY_NAME);
+            List<Integer> isr = new ArrayList<>(isrArray.length);
+            for (Object r : isrArray)
+                isr.add((Integer) r);
+
+            int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME);
+
+            Object[] replicasArray = partitionStateData.getArray(REPLICAS_KEY_NAME);
+            Set<Integer> replicas = new HashSet<>(replicasArray.length);
+            for (Object r : replicasArray)
+                replicas.add((Integer) r);
+
+            PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
+            partitionStates.put(new TopicPartition(topic, partition), partitionState);
+
+        }
+
+        Set<EndPoint> leaders = new HashSet<>();
+        for (Object leadersDataObj : struct.getArray(LIVE_LEADERS_KEY_NAME)) {
+            Struct leadersData = (Struct) leadersDataObj;
+            int id = leadersData.getInt(END_POINT_ID_KEY_NAME);
+            String host = leadersData.getString(HOST_KEY_NAME);
+            int port = leadersData.getInt(PORT_KEY_NAME);
+            leaders.add(new EndPoint(id, host, port));
+        }
+
+        controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
+        controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME);
+        this.partitionStates = partitionStates;
+        this.liveLeaders = leaders;
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        Map<TopicPartition, Short> responses = new HashMap<>(partitionStates.size());
+        for (TopicPartition partition : partitionStates.keySet()) {
+            responses.put(partition, Errors.forException(e).code());
+        }
+
+        switch (versionId) {
+            case 0:
+                return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LEADER_AND_ISR.id)));
+        }
+    }
+
+    public int controllerId() {
+        return controllerId;
+    }
+
+    public int controllerEpoch() {
+        return controllerEpoch;
+    }
+
+    public Map<TopicPartition, PartitionState> partitionStates() {
+        return partitionStates;
+    }
+
+    public Set<EndPoint> liveLeaders() {
+        return liveLeaders;
+    }
+
+    public static LeaderAndIsrRequest parse(ByteBuffer buffer, int versionId) {
+        return new LeaderAndIsrRequest(ProtoUtils.parseRequest(ApiKeys.LEADER_AND_ISR.id, versionId, buffer));
+    }
+
+    public static LeaderAndIsrRequest parse(ByteBuffer buffer) {
+        return new LeaderAndIsrRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
new file mode 100644
index 0000000..3a6f4ee
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class LeaderAndIsrResponse extends AbstractRequestResponse {
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEADER_AND_ISR.id);
+
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    private static final String PARTITIONS_TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_PARTITION_KEY_NAME = "partition";
+    private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code";
+
+    /**
+     * Possible error code:
+     *
+     * STALE_CONTROLLER_EPOCH (11)
+     */
+    private final short errorCode;
+
+    private final Map<TopicPartition, Short> responses;
+
+    public LeaderAndIsrResponse(Map<TopicPartition, Short> responses) {
+        this(Errors.NONE.code(), responses);
+    }
+
+    public LeaderAndIsrResponse(short errorCode, Map<TopicPartition, Short> responses) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+
+        List<Struct> responseDatas = new ArrayList<>(responses.size());
+        for (Map.Entry<TopicPartition, Short> response : responses.entrySet()) {
+            Struct partitionData = struct.instance(PARTITIONS_KEY_NAME);
+            TopicPartition partition = response.getKey();
+            partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic());
+            partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition());
+            partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue());
+            responseDatas.add(partitionData);
+        }
+
+        struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray());
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+
+        this.responses = responses;
+        this.errorCode = errorCode;
+    }
+
+    public LeaderAndIsrResponse(Struct struct) {
+        super(struct);
+
+        responses = new HashMap<>();
+        for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) {
+            Struct responseData = (Struct) responseDataObj;
+            String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME);
+            int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME);
+            short errorCode = responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME);
+            responses.put(new TopicPartition(topic, partition), errorCode);
+        }
+
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+    }
+
+    public Map<TopicPartition, Short> responses() {
+        return responses;
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public static LeaderAndIsrResponse parse(ByteBuffer buffer, int version) {
+        return new LeaderAndIsrResponse(ProtoUtils.parseResponse(ApiKeys.LEADER_AND_ISR.id, version, buffer));
+    }
+
+    public static LeaderAndIsrResponse parse(ByteBuffer buffer) {
+        return new LeaderAndIsrResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+}