You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/19 02:18:21 UTC

git commit: KAFKA-1252 Implement retries in new producer.

Repository: kafka
Updated Branches:
  refs/heads/trunk f550cc76c -> 3f0b67b6a


KAFKA-1252 Implement retries in new producer.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f0b67b6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f0b67b6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f0b67b6

Branch: refs/heads/trunk
Commit: 3f0b67b6ac864befccfdd4bb5dee08c0b33c3b43
Parents: f550cc7
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu Feb 13 13:48:21 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 18 17:18:13 2014 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   5 +-
 .../kafka/clients/producer/ProducerConfig.java  |  25 +-
 .../clients/producer/internals/Metadata.java    |  32 +--
 .../producer/internals/RecordAccumulator.java   |  53 ++--
 .../clients/producer/internals/RecordBatch.java |  26 +-
 .../clients/producer/internals/Sender.java      | 255 +++++++++++++++----
 .../java/org/apache/kafka/common/Cluster.java   |  40 +--
 .../common/errors/CorruptRecordException.java   |  30 +--
 .../common/errors/InvalidMetadataException.java |  39 +++
 .../errors/LeaderNotAvailableException.java     |  38 ++-
 .../kafka/common/errors/NetworkException.java   |  30 +--
 .../errors/NotLeaderForPartitionException.java  |  29 +--
 .../common/errors/OffsetMetadataTooLarge.java   |  27 +-
 .../errors/OffsetOutOfRangeException.java       |  30 +--
 .../common/errors/RecordTooLargeException.java  |  27 +-
 .../kafka/common/errors/RetriableException.java |  37 +++
 .../kafka/common/errors/RetryableException.java |  47 ----
 .../kafka/common/errors/TimeoutException.java   |  29 +--
 .../common/errors/UnknownServerException.java   |  29 ++-
 .../UnknownTopicOrPartitionException.java       |  29 +--
 .../apache/kafka/common/network/Selector.java   |  30 +--
 .../clients/producer/RecordAccumulatorTest.java |  27 +-
 .../kafka/clients/producer/SenderTest.java      | 139 ++++++++--
 23 files changed, 652 insertions(+), 401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3d180e8..e4bc972 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -96,7 +96,7 @@ public class KafkaProducer implements Producer {
         this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
                                                  this.totalMemorySize,
                                                  config.getLong(ProducerConfig.LINGER_MS_CONFIG),
-                                                 config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL),
+                                                 config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
                                                  metrics,
                                                  new SystemTime());
         List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG));
@@ -108,7 +108,10 @@ public class KafkaProducer implements Producer {
                                  config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                                  config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                                  (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG),
+                                 config.getInt(ProducerConfig.MAX_RETRIES_CONFIG),
                                  config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG),
+                                 config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
+                                 config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                                  new SystemTime());
         this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true);
         this.ioThread.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index dca9802..d8e35e7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -62,7 +62,7 @@ public class ProducerConfig extends AbstractConfig {
     /**
      * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent
      * faster than they can be delivered to the server the producer will either block or throw an exception based on the
-     * preference specified by {@link #BLOCK_ON_BUFFER_FULL}.
+     * preference specified by {@link #BLOCK_ON_BUFFER_FULL_CONFIG}.
      */
     public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
 
@@ -107,6 +107,11 @@ public class ProducerConfig extends AbstractConfig {
     public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
 
     /**
+     * The size of the TCP receive buffer to use when reading data (you generally shouldn't need to change this)
+     */
+    public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
+
+    /**
      * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server
      * has its own cap on record size which may be different from this.
      */
@@ -123,9 +128,17 @@ public class ProducerConfig extends AbstractConfig {
      * this setting is true and we block, however users who want to guarantee we never block can turn this into an
      * error.
      */
-    public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full";
+    public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
+
+    /**
+     * The maximum number of times to attempt resending the request before giving up.
+     */
+    public static final String MAX_RETRIES_CONFIG = "request.retries";
 
-    public static final String ENABLE_JMX = "enable.jmx";
+    /**
+     * Should we register the Kafka metrics as JMX mbeans?
+     */
+    public static final String ENABLE_JMX_CONFIG = "enable.jmx";
 
     static {
         /* TODO: add docs */
@@ -142,10 +155,12 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
+                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah")
                                 .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah")
                                 .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
-                                .define(BLOCK_ON_BUFFER_FULL, Type.BOOLEAN, true, "blah blah")
-                                .define(ENABLE_JMX, Type.BOOLEAN, true, "");
+                                .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah")
+                                .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
+                                .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "");
     }
 
     ProducerConfig(Map<? extends Object, ? extends Object> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 52d30a8..62613a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer.internals;
 
@@ -24,7 +20,6 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
 
-
 /**
  * A class encapsulating some of the logic around metadata.
  * <p>
@@ -134,4 +129,11 @@ public final class Metadata {
         notifyAll();
     }
 
+    /**
+     * The last time metadata was updated.
+     */
+    public synchronized long lastUpdate() {
+        return this.lastRefresh;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index be8a4a3..ce5cf27 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -1,25 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer.internals;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
@@ -39,10 +34,9 @@ import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 
-
 /**
- * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} instances to be
- * sent to the server.
+ * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
+ * instances to be sent to the server.
  * <p>
  * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
  * this behavior is explicitly disabled.
@@ -152,6 +146,17 @@ public final class RecordAccumulator {
     }
 
     /**
+     * Re-enqueue the given record batch in the accumulator to retry
+     */
+    public void reenqueue(RecordBatch batch, long now) {
+        batch.attempts++;
+        Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
+        synchronized (deque) {
+            deque.addFirst(batch);
+        }
+    }
+
+    /**
      * Get a list of topic-partitions which are ready to be sent.
      * <p>
      * A partition is ready if ANY of the following are true:
@@ -229,16 +234,10 @@ public final class RecordAccumulator {
     }
 
     /**
-     * Deallocate the list of record batches
+     * Deallocate the record batch
      */
-    public void deallocate(Collection<RecordBatch> batches) {
-        ByteBuffer[] buffers = new ByteBuffer[batches.size()];
-        int i = 0;
-        for (RecordBatch batch : batches) {
-            buffers[i] = batch.records.buffer();
-            i++;
-        }
-        free.deallocate(buffers);
+    public void deallocate(RecordBatch batch) {
+        free.deallocate(batch.records.buffer());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 7a440a3..eb16f6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer.internals;
 
@@ -25,7 +21,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 
-
 /**
  * A batch of records that is or will be sent.
  * 
@@ -33,6 +28,7 @@ import org.apache.kafka.common.record.MemoryRecords;
  */
 public final class RecordBatch {
     public int recordCount = 0;
+    public volatile int attempts = 0;
     public final long created;
     public final MemoryRecords records;
     public final TopicPartition topicPartition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index d93a455..e373265 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -22,12 +22,15 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Selectable;
@@ -41,6 +44,7 @@ import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 
 /**
  * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
@@ -48,19 +52,55 @@ import org.apache.kafka.common.utils.Time;
  */
 public class Sender implements Runnable {
 
-    private final Map<Integer, NodeState> nodeState;
+    /* the state of each nodes connection */
+    private final NodeStates nodeStates;
+
+    /* the record accumulator that batches records */
     private final RecordAccumulator accumulator;
+
+    /* the selector used to perform network i/o */
     private final Selectable selector;
+
+    /* the client id used to identify this client in requests to the server */
     private final String clientId;
+
+    /* the maximum request size to attempt to send to the server */
     private final int maxRequestSize;
-    private final long reconnectBackoffMs;
+
+    /* the number of acknowledgements to request from the server */
     private final short acks;
+
+    /* the max time in ms for the server to wait for acknowlegements */
     private final int requestTimeout;
+
+    /* the number of times to retry a failed request before giving up */
+    private final int retries;
+
+    /* the socket send buffer size in bytes */
+    private final int socketSendBuffer;
+
+    /* the socket receive size buffer in bytes */
+    private final int socketReceiveBuffer;
+
+    /* the set of currently in-flight requests awaiting a response from the server */
     private final InFlightRequests inFlightRequests;
+
+    /* a reference to the current Cluster instance */
     private final Metadata metadata;
+
+    /* the clock instance used for getting the time */
     private final Time time;
+
+    /* the current node to attempt to use for metadata requests (will round-robin over nodes) */
+    private int metadataFetchNodeIndex;
+
+    /* the current correlation id to use when sending requests to servers */
     private int correlation;
+
+    /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
     private boolean metadataFetchInProgress;
+
+    /* true while the sender thread is still running */
     private volatile boolean running;
 
     public Sender(Selectable selector,
@@ -70,22 +110,28 @@ public class Sender implements Runnable {
                   int maxRequestSize,
                   long reconnectBackoffMs,
                   short acks,
+                  int retries,
                   int requestTimeout,
+                  int socketSendBuffer,
+                  int socketReceiveBuffer,
                   Time time) {
-        this.nodeState = new HashMap<Integer, NodeState>();
+        this.nodeStates = new NodeStates(reconnectBackoffMs);
         this.accumulator = accumulator;
         this.selector = selector;
         this.maxRequestSize = maxRequestSize;
-        this.reconnectBackoffMs = reconnectBackoffMs;
         this.metadata = metadata;
         this.clientId = clientId;
         this.running = true;
         this.requestTimeout = requestTimeout;
         this.acks = acks;
+        this.retries = retries;
+        this.socketSendBuffer = socketSendBuffer;
+        this.socketReceiveBuffer = socketReceiveBuffer;
         this.inFlightRequests = new InFlightRequests();
         this.correlation = 0;
         this.metadataFetchInProgress = false;
         this.time = time;
+        this.metadataFetchNodeIndex = new Random().nextInt();
     }
 
     /**
@@ -130,11 +176,7 @@ public class Sender implements Runnable {
 
         // should we update our metadata?
         List<NetworkSend> sends = new ArrayList<NetworkSend>();
-        InFlightRequest metadataReq = maybeMetadataRequest(cluster, now);
-        if (metadataReq != null) {
-            sends.add(metadataReq.request);
-            this.inFlightRequests.add(metadataReq);
-        }
+        maybeUpdateMetadata(cluster, sends, now);
 
         // prune the list of ready topics to eliminate any that we aren't ready to send yet
         List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
@@ -158,43 +200,76 @@ public class Sender implements Runnable {
         // handle responses, connections, and disconnections
         handleSends(this.selector.completedSends());
         handleResponses(this.selector.completedReceives(), now);
-        handleDisconnects(this.selector.disconnected());
+        handleDisconnects(this.selector.disconnected(), now);
         handleConnects(this.selector.connected());
 
         return ready.size();
     }
 
-    private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) {
+    /**
+     * Add a metadata request to the list of sends if we need to make one
+     */
+    private void maybeUpdateMetadata(Cluster cluster, List<NetworkSend> sends, long now) {
         if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
-            return null;
+            return;
 
-        Node node = nextFreeNode(cluster);
+        Node node = selectMetadataDestination(cluster);
         if (node == null)
-            return null;
+            return;
 
-        NodeState state = nodeState.get(node.id());
-        if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
+        if (nodeStates.isConnected(node.id())) {
+            this.metadataFetchInProgress = true;
+            InFlightRequest request = metadataRequest(node.id(), metadata.topics());
+            sends.add(request.request);
+            this.inFlightRequests.add(request);
+        } else if (nodeStates.canConnect(node.id(), now)) {
             // we don't have a connection to this node right now, make one
             initiateConnect(node, now);
-            return null;
-        } else if (state.state == ConnectionState.CONNECTED) {
-            this.metadataFetchInProgress = true;
-            return metadataRequest(node.id(), metadata.topics());
-        } else {
-            return null;
         }
     }
 
     /**
+     * Find a good node to make a metadata request to. This method will first look for a node that has an existing
+     * connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding
+     * requests.
      * @return A node with no requests currently being sent or null if no such node exists
      */
-    private Node nextFreeNode(Cluster cluster) {
-        for (int i = 0; i < cluster.nodes().size(); i++) {
-            Node node = cluster.nextNode();
-            if (this.inFlightRequests.canSendMore(node.id()))
+    private Node selectMetadataDestination(Cluster cluster) {
+        List<Node> nodes = cluster.nodes();
+
+        // first look for a node to which we are connected and have no outstanding requests
+        boolean connectionInProgress = false;
+        for (int i = 0; i < nodes.size(); i++) {
+            Node node = nodes.get(metadataNodeIndex(i, nodes.size()));
+            if (nodeStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) {
+                this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size());
+                return node;
+            } else if (nodeStates.isConnecting(node.id())) {
+                connectionInProgress = true;
+            }
+        }
+
+        // if we have a connection that is being established now, just wait for that don't make another
+        if (connectionInProgress)
+            return null;
+
+        // okay, no luck, pick a random unused node
+        for (int i = 0; i < nodes.size(); i++) {
+            Node node = nodes.get(metadataNodeIndex(i, nodes.size()));
+            if (this.inFlightRequests.canSendMore(node.id())) {
+                this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size());
                 return node;
+            }
         }
-        return null;
+
+        return null; // we failed to find a good destination
+    }
+
+    /**
+     * Get the index in the node list of the node to use for the metadata request
+     */
+    private int metadataNodeIndex(int offset, int size) {
+        return Utils.abs(offset + this.metadataFetchNodeIndex) % size;
     }
 
     /**
@@ -209,7 +284,7 @@ public class Sender implements Runnable {
     /**
      * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add
      * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate
-     * metdata to be able to do so
+     * metadata to be able to do so
      */
     private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long now) {
         List<TopicPartition> sendable = new ArrayList<TopicPartition>(ready.size());
@@ -218,15 +293,11 @@ public class Sender implements Runnable {
             if (node == null) {
                 // we don't know about this topic/partition or it has no leader, re-fetch metadata
                 metadata.forceUpdate();
-            } else {
-                NodeState state = nodeState.get(node.id());
-                // TODO: encapsulate this logic somehow
-                if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
-                    // we don't have a connection to this node right now, make one
-                    initiateConnect(node, now);
-                } else if (state.state == ConnectionState.CONNECTED && inFlightRequests.canSendMore(node.id())) {
-                    sendable.add(tp);
-                }
+            } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
+                sendable.add(tp);
+            } else if (nodeStates.canConnect(node.id(), now)) {
+                // we don't have a connection to this node right now, make one
+                initiateConnect(node, now);
             }
         }
         return sendable;
@@ -237,13 +308,11 @@ public class Sender implements Runnable {
      */
     private void initiateConnect(Node node, long now) {
         try {
-            selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 64 * 1024 * 1024, 64 * 1024 * 1024); // TODO
-                                                                                                                              // socket
-                                                                                                                              // buffers
-            nodeState.put(node.id(), new NodeState(ConnectionState.CONNECTING, now));
+            selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
+            this.nodeStates.connecting(node.id(), now);
         } catch (IOException e) {
             /* attempt failed, we'll try again after the backoff */
-            nodeState.put(node.id(), new NodeState(ConnectionState.DISCONNECTED, now));
+            nodeStates.disconnected(node.id());
             /* maybe the problem is our metadata, update it */
             metadata.forceUpdate();
         }
@@ -252,19 +321,26 @@ public class Sender implements Runnable {
     /**
      * Handle any closed connections
      */
-    private void handleDisconnects(List<Integer> disconnects) {
+    private void handleDisconnects(List<Integer> disconnects, long now) {
+        // clear out the in-flight requests for the disconnected broker
         for (int node : disconnects) {
             for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
                 if (request.batches != null) {
-                    for (RecordBatch batch : request.batches.values())
-                        batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
-                    this.accumulator.deallocate(request.batches.values());
+                    for (RecordBatch batch : request.batches.values()) {
+                        if (canRetry(batch, Errors.NETWORK_EXCEPTION)) {
+                            this.accumulator.reenqueue(batch, now);
+                        } else {
+                            batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
+                            this.accumulator.deallocate(batch);
+                        }
+                    }
                 }
-                NodeState state = this.nodeState.get(request.request.destination());
-                if (state != null)
-                    state.state = ConnectionState.DISCONNECTED;
+                nodeStates.disconnected(request.request.destination());
             }
         }
+        // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
+        if (disconnects.size() > 0)
+            this.metadata.forceUpdate();
     }
 
     /**
@@ -272,7 +348,7 @@ public class Sender implements Runnable {
      */
     private void handleConnects(List<Integer> connects) {
         for (Integer id : connects)
-            this.nodeState.get(id).state = ConnectionState.CONNECTED;
+            this.nodeStates.connected(id);
     }
 
     /**
@@ -286,9 +362,10 @@ public class Sender implements Runnable {
             if (!request.expectResponse) {
                 requests.pollFirst();
                 if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) {
-                    for (RecordBatch batch : request.batches.values())
+                    for (RecordBatch batch : request.batches.values()) {
                         batch.done(-1L, Errors.NONE.exception());
-                    this.accumulator.deallocate(request.batches.values());
+                        this.accumulator.deallocate(batch);
+                    }
                 }
             }
         }
@@ -306,7 +383,7 @@ public class Sender implements Runnable {
             Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
             correlate(req.request.header(), header);
             if (req.request.header().apiKey() == ApiKeys.PRODUCE.id)
-                handleProduceResponse(req, body);
+                handleProduceResponse(req, body, now);
             else if (req.request.header().apiKey() == ApiKeys.METADATA.id)
                 handleMetadataResponse(body, now);
             else
@@ -327,7 +404,7 @@ public class Sender implements Runnable {
     /**
      * Handle a produce response
      */
-    private void handleProduceResponse(InFlightRequest request, Struct response) {
+    private void handleProduceResponse(InFlightRequest request, Struct response, long now) {
         for (Object topicResponse : (Object[]) response.get("responses")) {
             Struct topicRespStruct = (Struct) topicResponse;
             String topic = (String) topicRespStruct.get("topic");
@@ -335,12 +412,31 @@ public class Sender implements Runnable {
                 Struct partRespStruct = (Struct) partResponse;
                 int partition = (Integer) partRespStruct.get("partition");
                 short errorCode = (Short) partRespStruct.get("error_code");
+
+                // if we got an error we may need to refresh our metadata
+                Errors error = Errors.forCode(errorCode);
+                if (error.exception() instanceof InvalidMetadataException)
+                    metadata.forceUpdate();
+
                 long offset = (Long) partRespStruct.get("base_offset");
                 RecordBatch batch = request.batches.get(new TopicPartition(topic, partition));
-                batch.done(offset, Errors.forCode(errorCode).exception());
+                if (canRetry(batch, error)) {
+                    // retry
+                    this.accumulator.reenqueue(batch, now);
+                } else {
+                    // tell the user the result of their request
+                    batch.done(offset, error.exception());
+                    this.accumulator.deallocate(batch);
+                }
             }
         }
-        this.accumulator.deallocate(request.batches.values());
+    }
+
+    /**
+     * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
+     */
+    private boolean canRetry(RecordBatch batch, Errors error) {
+        return batch.attempts < this.retries && error.exception() instanceof RetriableException;
     }
 
     /**
@@ -459,6 +555,53 @@ public class Sender implements Runnable {
         }
     }
 
+    private static class NodeStates {
+        private final long reconnectBackoffMs;
+        private final Map<Integer, NodeState> nodeState;
+
+        public NodeStates(long reconnectBackoffMs) {
+            this.reconnectBackoffMs = reconnectBackoffMs;
+            this.nodeState = new HashMap<Integer, NodeState>();
+        }
+
+        public boolean canConnect(int node, long now) {
+            NodeState state = nodeState.get(node);
+            if (state == null)
+                return true;
+            else
+                return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs;
+        }
+
+        public void connecting(int node, long now) {
+            nodeState.put(node, new NodeState(ConnectionState.CONNECTING, now));
+        }
+
+        public boolean isConnected(int node) {
+            NodeState state = nodeState.get(node);
+            return state != null && state.state == ConnectionState.CONNECTED;
+        }
+
+        public boolean isConnecting(int node) {
+            NodeState state = nodeState.get(node);
+            return state != null && state.state == ConnectionState.CONNECTING;
+        }
+
+        public void connected(int node) {
+            nodeState(node).state = ConnectionState.CONNECTED;
+        }
+
+        public void disconnected(int node) {
+            nodeState(node).state = ConnectionState.DISCONNECTED;
+        }
+
+        private NodeState nodeState(int node) {
+            NodeState state = this.nodeState.get(node);
+            if (state == null)
+                throw new IllegalStateException("No entry found for node " + node);
+            return state;
+        }
+    }
+
     /**
      * An request that hasn't been fully processed yet
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index c17a8f8..5caaaae 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common;
 
@@ -23,17 +19,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kafka.common.utils.Utils;
-
 
 /**
  * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
  */
 public final class Cluster {
 
-    private final AtomicInteger counter = new AtomicInteger(0);
     private final List<Node> nodes;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
     private final Map<String, List<PartitionInfo>> partitionsByTopic;
@@ -126,15 +117,4 @@ public final class Cluster {
         return this.partitionsByTopic.get(topic);
     }
 
-    /**
-     * Round-robin over the nodes in this cluster
-     */
-    public Node nextNode() {
-        int size = nodes.size();
-        if (size == 0)
-            throw new IllegalStateException("No known nodes.");
-        int idx = Utils.abs(counter.getAndIncrement()) % size;
-        return this.nodes.get(idx);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
index 673f61d..eaccf27 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java
@@ -1,22 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.errors;
 
-public class CorruptRecordException extends ApiException {
+/**
+ * This exception indicates a record has failed it's internal CRC check, this generally indicates network or disk
+ * corruption.
+ */
+public class CorruptRecordException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java
new file mode 100644
index 0000000..8841bad
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * An exception that may indicate the client's metadata is out of date
+ */
+public abstract class InvalidMetadataException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidMetadataException() {
+        super();
+    }
+
+    public InvalidMetadataException(String message) {
+        super(message);
+    }
+
+    public InvalidMetadataException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public InvalidMetadataException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
index 0bde6b5..9d7ebd4 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java
@@ -1,35 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.errors;
 
-public class LeaderNotAvailableException extends RetryableException {
+/**
+ * There is no currently available leader for the given partition (either because a leadership election is in progress
+ * or because all replicas are down).
+ */
+public class LeaderNotAvailableException extends InvalidMetadataException {
 
     private static final long serialVersionUID = 1L;
 
-    public LeaderNotAvailableException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
     public LeaderNotAvailableException(String message) {
         super(message);
     }
 
-    public LeaderNotAvailableException(Throwable cause) {
-        super(cause);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java b/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
index 3a04159..f0baa98 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java
@@ -1,22 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.errors;
 
-public class NetworkException extends ApiException {
+/**
+ * A misc. network-related IOException occurred when making a request. This could be because the client's metadata is
+ * out of date and it is making a request to a node that is now dead.
+ */
+public class NetworkException extends InvalidMetadataException {
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
index 5adc72c..ad9c77c 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
@@ -1,22 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.errors;
 
-public class NotLeaderForPartitionException extends RetryableException {
+/**
+ * This server is not the leader for the given partition
+ */
+public class NotLeaderForPartitionException extends InvalidMetadataException {
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
index a3159bb..0be2f50 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java
@@ -1,21 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * The client has tried to save its offset with associated metadata larger than the maximum size allowed by the server.
+ */
 public class OffsetMetadataTooLarge extends ApiException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
index d01698a..fc7c6e3 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java
@@ -1,22 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.errors;
 
-public class OffsetOutOfRangeException extends ApiException {
+/**
+ * This offset is either larger or smaller than the range of offsets the server has for the given partition.
+ * 
+ */
+public class OffsetOutOfRangeException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
index ce95ca0..737b7f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java
@@ -1,21 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * This record is larger than the maximum allowable size
+ */
 public class RecordTooLargeException extends ApiException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java b/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java
new file mode 100644
index 0000000..6c639a9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * A retryable exception is a transient exception that if retried may succeed.
+ */
+public abstract class RetriableException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public RetriableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RetriableException(String message) {
+        super(message);
+    }
+
+    public RetriableException(Throwable cause) {
+        super(cause);
+    }
+
+    public RetriableException() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java b/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java
deleted file mode 100644
index c7f2f22..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-/**
- * A retryable exception is an exception that is safe to retry. To be retryable an exception should be
- * <ol>
- * <li>Transient, there is no point retrying a error due to a non-existant topic or message too large
- * <li>Idempotent, the exception is known to not change any state on the server
- * </ol>
- * A client may choose to retry any request they like, but exceptions extending this class are always safe and sane to
- * retry.
- */
-public abstract class RetryableException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public RetryableException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public RetryableException(String message) {
-        super(message);
-    }
-
-    public RetryableException(Throwable cause) {
-        super(cause);
-    }
-
-    public RetryableException() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
index dffd64d..c7f569c 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java
@@ -1,22 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.errors;
 
-public class TimeoutException extends ApiException {
+/**
+ * Indicates that a request timed out.
+ */
+public class TimeoutException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
index a0690fe..963ef08 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java
@@ -1,21 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * An error occurred on the server for which the client doesn't have a corresponding error code. This is generally an
+ * unexpected error.
+ * 
+ */
 public class UnknownServerException extends ApiException {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
index 73d1953..ec423bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
@@ -1,22 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.errors;
 
-public class UnknownTopicOrPartitionException extends ApiException {
+/**
+ * This topic/partition doesn't exist
+ */
+public class UnknownTopicOrPartitionException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8ed4c73..f1e474c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.network;
 
@@ -32,7 +28,6 @@ import java.util.Set;
 
 import org.apache.kafka.common.KafkaException;
 
-
 /**
  * A selector interface for doing non-blocking multi-connection network I/O.
  * <p>
@@ -302,8 +297,11 @@ public class Selector implements Selectable {
     private void close(SelectionKey key) throws IOException {
         SocketChannel channel = channel(key);
         Transmissions trans = transmissions(key);
-        if (trans != null)
+        if (trans != null) {
             this.disconnected.add(trans.id);
+            trans.clearReceive();
+            trans.clearSend();
+        }
         key.attach(null);
         key.cancel();
         channel.socket().close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index 1bbe83c..a3bf07e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer;
 
@@ -25,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.RecordBatch;
 import org.apache.kafka.common.TopicPartition;
@@ -140,8 +135,8 @@ public class RecordAccumulatorTest {
             for (RecordBatch batch : batches) {
                 for (LogEntry entry : batch.records)
                     read++;
+                accum.deallocate(batch);
             }
-            accum.deallocate(batches);
         }
 
         for (Thread t : threads)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f0b67b6/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 41c028b..19a0125 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -1,29 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
@@ -46,6 +42,16 @@ import org.junit.Test;
 
 public class SenderTest {
 
+    private static final String CLIENT_ID = "";
+    private static final int MAX_REQUEST_SIZE = 1024 * 1024;
+    private static final long RECONNECT_BACKOFF_MS = 0L;
+    private static final short ACKS_ALL = -1;
+    private static final int MAX_RETRIES = 0;
+    private static final int REQUEST_TIMEOUT_MS = 10000;
+    private static final int SEND_BUFFER_SIZE = 64 * 1024;
+    private static final int RECEIVE_BUFFER_SIZE = 64 * 1024;
+
+    private TopicPartition tp = new TopicPartition("test", 0);
     private MockTime time = new MockTime();
     private MockSelector selector = new MockSelector(time);
     private int batchSize = 16 * 1024;
@@ -53,7 +59,18 @@ public class SenderTest {
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = new Metrics(time);
     private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time);
-    private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time);
+    private Sender sender = new Sender(selector,
+                                       metadata,
+                                       this.accumulator,
+                                       CLIENT_ID,
+                                       MAX_REQUEST_SIZE,
+                                       RECONNECT_BACKOFF_MS,
+                                       ACKS_ALL,
+                                       MAX_RETRIES,
+                                       REQUEST_TIMEOUT_MS,
+                                       SEND_BUFFER_SIZE,
+                                       RECEIVE_BUFFER_SIZE,
+                                       time);
 
     @Before
     public void setup() {
@@ -62,7 +79,6 @@ public class SenderTest {
 
     @Test
     public void testSimple() throws Exception {
-        TopicPartition tp = new TopicPartition("test", 0);
         Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
         sender.run(time.milliseconds());
         assertEquals("We should have connected", 1, selector.connected().size());
@@ -83,6 +99,93 @@ public class SenderTest {
         assertEquals(offset, future.get().offset());
     }
 
+    @Test
+    public void testRetries() throws Exception {
+        // create a sender with retries = 1
+        int maxRetries = 1;
+        Sender sender = new Sender(selector,
+                                   metadata,
+                                   this.accumulator,
+                                   CLIENT_ID,
+                                   MAX_REQUEST_SIZE,
+                                   RECONNECT_BACKOFF_MS,
+                                   ACKS_ALL,
+                                   maxRetries,
+                                   REQUEST_TIMEOUT_MS,
+                                   SEND_BUFFER_SIZE,
+                                   RECEIVE_BUFFER_SIZE,
+                                   time);
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+        RequestSend request1 = completeSend(sender);
+        selector.clear();
+        selector.completeReceive(produceResponse(request1.header().correlationId(),
+                                                 cluster.leaderFor(tp).id(),
+                                                 tp.topic(),
+                                                 tp.partition(),
+                                                 -1,
+                                                 Errors.REQUEST_TIMED_OUT.code()));
+        sender.run(time.milliseconds());
+        selector.clear();
+        sender.run(time.milliseconds());
+        RequestSend request2 = completeSend(sender);
+        selector.completeReceive(produceResponse(request2.header().correlationId(),
+                                                 cluster.leaderFor(tp).id(),
+                                                 tp.topic(),
+                                                 tp.partition(),
+                                                 42,
+                                                 Errors.NONE.code()));
+        sender.run(time.milliseconds());
+        assertTrue("Request should retry and complete", future.isDone());
+        assertEquals(42, future.get().offset());
+    }
+
+    @Test
+    public void testMetadataRefreshOnNoLeaderException() throws Exception {
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+        RequestSend request = completeSend();
+        selector.clear();
+        selector.completeReceive(produceResponse(request.header().correlationId(),
+                                                 cluster.leaderFor(tp).id(),
+                                                 tp.topic(),
+                                                 tp.partition(),
+                                                 -1,
+                                                 Errors.NOT_LEADER_FOR_PARTITION.code()));
+        sender.run(time.milliseconds());
+        completedWithError(future, Errors.NOT_LEADER_FOR_PARTITION);
+        assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds()));
+    }
+
+    @Test
+    public void testMetadataRefreshOnDisconnect() throws Exception {
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+        completeSend();
+        selector.clear();
+        selector.disconnect(cluster.leaderFor(tp).id());
+        sender.run(time.milliseconds());
+        completedWithError(future, Errors.NETWORK_EXCEPTION);
+        assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds()));
+    }
+
+    private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
+        assertTrue("Request should be completed", future.isDone());
+        try {
+            future.get();
+            fail("Should have thrown an exception.");
+        } catch (ExecutionException e) {
+            assertEquals(error.exception().getClass(), e.getCause().getClass());
+        }
+    }
+
+    private RequestSend completeSend() {
+        return completeSend(sender);
+    }
+
+    private RequestSend completeSend(Sender sender) {
+        while (selector.completedSends().size() == 0)
+            sender.run(time.milliseconds());
+        return (RequestSend) selector.completedSends().get(0);
+    }
+
     private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) {
         Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
         Struct response = struct.instance("responses");