You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/03/01 00:51:58 UTC

[2/2] kafka git commit: KAFKA-1865 Add a flush() method to the producer.

KAFKA-1865 Add a flush() method to the producer.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0636928d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0636928d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0636928d

Branch: refs/heads/trunk
Commit: 0636928d961a6ceaab46d908f9372d913c3e5faf
Parents: 22ff9e9
Author: Jay Kreps <ja...@gmail.com>
Authored: Sat Feb 7 12:01:51 2015 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Sat Feb 28 14:11:59 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java |  10 +-
 .../kafka/clients/producer/KafkaProducer.java   | 187 ++++++++++++---
 .../kafka/clients/producer/MockProducer.java    |   5 +
 .../apache/kafka/clients/producer/Producer.java |   5 +
 .../kafka/clients/producer/ProducerRecord.java  |  20 +-
 .../internals/FutureRecordMetadata.java         |  10 +-
 .../producer/internals/RecordAccumulator.java   |  77 ++++++-
 .../clients/producer/internals/RecordBatch.java |  13 +-
 .../kafka/common/errors/InterruptException.java |  34 +++
 .../apache/kafka/common/utils/SystemTime.java   |   2 +-
 .../org/apache/kafka/clients/MetadataTest.java  | 103 +++++++++
 .../kafka/clients/producer/BufferPoolTest.java  | 193 ----------------
 .../kafka/clients/producer/MetadataTest.java    |  95 --------
 .../clients/producer/MockProducerTest.java      |   6 +
 .../kafka/clients/producer/PartitionerTest.java |  69 ------
 .../clients/producer/RecordAccumulatorTest.java | 207 -----------------
 .../kafka/clients/producer/SenderTest.java      | 155 -------------
 .../producer/internals/BufferPoolTest.java      | 193 ++++++++++++++++
 .../producer/internals/PartitionerTest.java     |  68 ++++++
 .../internals/RecordAccumulatorTest.java        | 228 +++++++++++++++++++
 .../clients/producer/internals/SenderTest.java  | 154 +++++++++++++
 .../kafka/api/ProducerSendTest.scala            |  62 +++--
 .../test/scala/unit/kafka/utils/TestUtils.scala |   4 +-
 23 files changed, 1097 insertions(+), 803 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index e8afecd..c8bde8b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -99,19 +99,15 @@ public final class Metadata {
     /**
      * Wait for metadata update until the current version is larger than the last version we know of
      */
-    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
+    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
         if (maxWaitMs < 0) {
             throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
         }
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
         while (this.version <= lastVersion) {
-            try {
-                if (remainingWaitMs != 0) {
-                    wait(remainingWaitMs);
-                }
-            } catch (InterruptedException e) { /* this is fine */
-            }
+            if (remainingWaitMs != 0)
+                wait(remainingWaitMs);
             long elapsed = System.currentTimeMillis() - begin;
             if (elapsed >= maxWaitMs)
                 throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1fd6917..7397e56 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -55,10 +56,66 @@ import org.slf4j.LoggerFactory;
 /**
  * A Kafka client that publishes records to the Kafka cluster.
  * <P>
- * The producer is <i>thread safe</i> and should generally be shared among all threads for best performance.
+ * The producer is <i>thread safe</i> and sharing a single producer instance across threads will generally be faster than
+ * having multiple instances.
  * <p>
- * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
- * needs to communicate with. Failure to close the producer after use will leak these resources.
+ * Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value
+ * pairs.
+ * <pre>
+ * {@code
+ * Properties props = new Properties();
+ * props.put("bootstrap.servers", "localhost:4242");
+ * props.put("acks", "all");
+ * props.put("retries", 0);
+ * props.put("batch.size", 16384);
+ * props.put("linger.ms", 1);
+ * props.put("buffer.memory", 33554432);
+ * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ * 
+ * Producer<String, String> producer = new KafkaProducer(props);
+ * for(int i = 0; i < 100; i++)
+ *     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
+ * 
+ * producer.close();
+ * }</pre>
+ * <p>
+ * The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server
+ * as well as a background I/O thread that is responsible for turning these records into requests and transmitting them
+ * to the cluster. Failure to close the producer after use will leak these resources.
+ * <p>
+ * The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends
+ * and immediately returns. This allows the producer to batch together individual records for efficiency.
+ * <p>
+ * The <code>acks</code> config controls the criteria under which requests are considered complete. The "all" setting
+ * we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
+ * <p>
+ * If the request fails, the producer can automatically retry, though since we have specified <code>retries</code>
+ * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on 
+ * <a href="http://kafka.apache.org/documentation.html#semantics">message delivery semantics</a> for details).
+ * <p>
+ * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by 
+ * the <code>batch.size</code> config. Making this larger can result in more batching, but requires more memory (since we will
+ * generally have one of these buffers for each active partition).
+ * <p>
+ * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you 
+ * want to reduce the number of requests you can set <code>linger.ms</code> to something greater than 0. This will
+ * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will 
+ * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, 
+ * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting 
+ * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that 
+ * records that arrive close together in time will generally batch together even with <code>linger.ms=0</code> so under heavy load 
+ * batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more
+ * efficient requests when not under maximal load at the cost of a small amount of latency.
+ * <p>
+ * The <code>buffer.memory</code> controls the total amount of memory available to the producer for buffering. If records
+ * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is 
+ * exhausted additional send calls will block. For uses where you want to avoid any blocking you can set <code>block.on.buffer.full=false</code> which
+ * will cause the send call to result in an exception.
+ * <p>
+ * The <code>key.serializer</code> and <code>value.serializer</code> instruct how to turn the key and value objects the user provides with
+ * their <code>ProducerRecord</code> into bytes. You can use the included {@link org.apache.kafka.common.serialization.ByteArraySerializer} or
+ * {@link org.apache.kafka.common.serialization.StringSerializer} for simple string or byte types.
  */
 public class KafkaProducer<K, V> implements Producer<K, V> {
 
@@ -241,8 +298,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     }
 
     /**
-     * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
-     * @param record  The record to be sent
+     * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. 
+     * See {@link #send(ProducerRecord, Callback)} for details.
      */
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
@@ -261,53 +318,59 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * <p>
      * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
      * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
-     * get()} on this future will result in the metadata for the record or throw any exception that occurred while
-     * sending the record.
+     * get()} on this future will block until the associated request completes and then return the metadata for the record 
+     * or throw any exception that occurred while sending the record.
      * <p>
-     * If you want to simulate a simple blocking call you can do the following:
+     * If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately:
      * 
-     * <pre>{@code
-     * producer.send(new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes())).get();
+     * <pre>
+     * {@code
+     * byte[] key = "key".getBytes();
+     * byte[] value = "value".getBytes();
+     * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
+     * producer.send(record).get();
      * }</pre>
      * <p>
-     * Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
+     * Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
      * will be invoked when the request is complete.
      * 
-     * <pre>{@code
-     * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
-     *   producer.send(myRecord,
-     *                new Callback() {
-     *                     public void onCompletion(RecordMetadata metadata, Exception e) {
-     *                         if(e != null)
-     *                             e.printStackTrace();
-     *                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
-     *                     }
-     *                });
-     * }</pre>
+     * <pre>
+     * {@code
+     * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
+     * producer.send(myRecord,
+     *               new Callback() {
+     *                   public void onCompletion(RecordMetadata metadata, Exception e) {
+     *                       if(e != null)
+     *                           e.printStackTrace();
+     *                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
+     *                   }
+     *               });
+     * }
+     * </pre>
      * 
      * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
      * following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
      * 
-     * <pre>{@code
+     * <pre>
+     * {@code
      * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
      * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
-     * }</pre>
+     * }
+     * </pre>
      * <p>
      * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
      * they will delay the sending of messages from other threads. If you want to execute blocking or computationally
      * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
      * to parallelize processing.
-     * <p>
-     * The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is
-     * controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called faster than the
-     * I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in
-     * this case is to block the send call until the I/O thread catches up and more buffer space is available. However
-     * in cases where non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will cause the
-     * producer to instead throw an exception when buffer memory is exhausted.
      * 
      * @param record The record to send
      * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
      *        indicates no callback)
+     *        
+     * @throws InterruptException If the thread is interrupted while blocked
+     * @throws SerializationException If the key or value are not valid objects given the configured serializers
+     * @throws BufferExhaustedException If <code>block.on.buffer.full=false</code> and the buffer is full.
+     * 
      */
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
@@ -352,7 +415,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             return new FutureFailure(e);
         } catch (InterruptedException e) {
             this.errors.record();
-            throw new KafkaException(e);
+            throw new InterruptException(e);
         } catch (KafkaException e) {
             this.errors.record();
             throw e;
@@ -364,7 +427,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @param topic The topic we want metadata for
      * @param maxWaitMs The maximum time in ms for waiting on the metadata
      */
-    private void waitOnMetadata(String topic, long maxWaitMs) {
+    private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
         if (metadata.fetch().partitionsForTopic(topic) != null) {
             return;
         } else {
@@ -399,20 +462,72 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                                               ProducerConfig.BUFFER_MEMORY_CONFIG +
                                               " configuration.");
     }
+    
+    /**
+     * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is 
+     * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
+     * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>). 
+     * A request is considered completed when it is successfully acknowledged 
+     * according to the <code>acks</code> configuration you have specified or else it results in an error.
+     * <p>
+     * Other threads can continue sending records while one thread is blocked waiting for a flush call to complete,
+     * however no guarantee is made about the completion of records sent after the flush call begins.
+     * <p>
+     * This method can be useful when consuming from some input system and producing into Kafka. The <code>flush()</code> call
+     * gives a convenient way to ensure all previously sent messages have actually completed.
+     * <p>
+     * This example shows how to consume from one Kafka topic and produce to another Kafka topic:
+     * <pre>
+     * {@code
+     * for(ConsumerRecord<String, String> record: consumer.poll(100))
+     *     producer.send(new ProducerRecord("my-topic", record.key(), record.value());
+     * producer.flush();
+     * consumer.commit();
+     * }
+     * </pre>
+     * 
+     * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur
+     * we need to set <code>retries=&lt;large_number&gt;</code> in our config.
+     * 
+     * @throws InterruptException If the thread is interrupted while blocked
+     */
+    @Override
+    public void flush() {
+        log.trace("Flushing accumulated records in producer.");
+        this.accumulator.beginFlush();
+        this.sender.wakeup();
+        try {
+            this.accumulator.awaitFlushCompletion();
+        } catch (InterruptedException e) {
+            throw new InterruptException("Flush interrupted.", e);
+        }
+    }
 
+    /**
+     * Get the partition metadata for the give topic. This can be used for custom partitioning.
+     * @throws InterruptException If the thread is interrupted while blocked
+     */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
-        waitOnMetadata(topic, this.metadataFetchTimeoutMs);
+        try {
+            waitOnMetadata(topic, this.metadataFetchTimeoutMs);
+        } catch (InterruptedException e) {
+            throw new InterruptException(e);
+        }
         return this.metadata.fetch().partitionsForTopic(topic);
     }
 
+    /**
+     * Get the full set of internal metrics maintained by the producer.
+     */
     @Override
     public Map<MetricName, ? extends Metric> metrics() {
         return Collections.unmodifiableMap(this.metrics.metrics());
     }
 
     /**
-     * Close this producer. This method blocks until all in-flight requests complete.
+     * Close this producer. This method blocks until all previously sent requests complete.
+     * @throws InterruptException If the thread is interrupted while blocked
      */
     @Override
     public void close() {
@@ -421,7 +536,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         try {
             this.ioThread.join();
         } catch (InterruptedException e) {
-            throw new KafkaException(e);
+            throw new InterruptException(e);
         }
         this.metrics.close();
         this.keySerializer.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 84530f2..6913090 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -128,6 +128,11 @@ public class MockProducer implements Producer<byte[], byte[]> {
             return offset;
         }
     }
+    
+    public synchronized void flush() {
+        while (!this.completions.isEmpty())
+            completeNext();
+    }
 
     public List<PartitionInfo> partitionsFor(String topic) {
         return this.cluster.partitionsForTopic(topic);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 17fe541..5b3e75e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -45,6 +45,11 @@ public interface Producer<K, V> extends Closeable {
      * Send a record and invoke the given callback when the record has been acknowledged by the server
      */
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
+    
+    /**
+     * Flush any accumulated records from the producer. Blocks until all sends are complete.
+     */
+    public void flush();
 
     /**
      * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index 4990692..75cd51e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -102,15 +102,21 @@ public final class ProducerRecord<K, V> {
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (!(o instanceof ProducerRecord)) return false;
+        if (this == o)
+            return true;
+        else if (!(o instanceof ProducerRecord))
+            return false;
 
-        ProducerRecord that = (ProducerRecord) o;
+        ProducerRecord<?, ?> that = (ProducerRecord<?, ?>) o;
 
-        if (key != null ? !key.equals(that.key) : that.key != null) return false;
-        if (partition != null ? !partition.equals(that.partition) : that.partition != null) return false;
-        if (topic != null ? !topic.equals(that.topic) : that.topic != null) return false;
-        if (value != null ? !value.equals(that.value) : that.value != null) return false;
+        if (key != null ? !key.equals(that.key) : that.key != null) 
+            return false;
+        else if (partition != null ? !partition.equals(that.partition) : that.partition != null) 
+            return false;
+        else if (topic != null ? !topic.equals(that.topic) : that.topic != null) 
+            return false;
+        else if (value != null ? !value.equals(that.value) : that.value != null) 
+            return false;
 
         return true;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index 4a2da41..e2d9ca8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -51,13 +51,17 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
         return valueOrError();
     }
 
-    private RecordMetadata valueOrError() throws ExecutionException {
+    RecordMetadata valueOrError() throws ExecutionException {
         if (this.result.error() != null)
             throw new ExecutionException(this.result.error());
         else
-            return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
+            return value();
     }
-
+    
+    RecordMetadata value() {
+        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
+    }
+    
     public long relativeOffset() {
         return this.relativeOffset;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index ecfe214..d5c79e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.Cluster;
@@ -55,6 +56,7 @@ public final class RecordAccumulator {
     private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
 
     private volatile boolean closed;
+    private volatile AtomicInteger flushesInProgress;
     private int drainIndex;
     private final int batchSize;
     private final long lingerMs;
@@ -62,6 +64,7 @@ public final class RecordAccumulator {
     private final BufferPool free;
     private final Time time;
     private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
+    private final IncompleteRecordBatches incomplete;
 
     /**
      * Create a new record accumulator
@@ -89,12 +92,14 @@ public final class RecordAccumulator {
                              Map<String, String> metricTags) {
         this.drainIndex = 0;
         this.closed = false;
+        this.flushesInProgress = new AtomicInteger(0);
         this.batchSize = batchSize;
         this.lingerMs = lingerMs;
         this.retryBackoffMs = retryBackoffMs;
         this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
         String metricGrpName = "producer-metrics";
         this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags);
+        this.incomplete = new IncompleteRecordBatches();
         this.time = time;
         registerMetrics(metrics, metricGrpName, metricTags);
     }
@@ -146,9 +151,8 @@ public final class RecordAccumulator {
             RecordBatch last = dq.peekLast();
             if (last != null) {
                 FutureRecordMetadata future = last.tryAppend(key, value, callback);
-                if (future != null) {
+                if (future != null)
                     return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
-                }
             }
         }
 
@@ -161,8 +165,7 @@ public final class RecordAccumulator {
             if (last != null) {
                 FutureRecordMetadata future = last.tryAppend(key, value, callback);
                 if (future != null) {
-                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
-                    // often...
+                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                     free.deallocate(buffer);
                     return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                 }
@@ -172,6 +175,7 @@ public final class RecordAccumulator {
             FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
 
             dq.addLast(batch);
+            incomplete.add(batch);
             return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
         }
     }
@@ -226,7 +230,7 @@ public final class RecordAccumulator {
                         long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                         boolean full = deque.size() > 1 || batch.records.isFull();
                         boolean expired = waitedTimeMs >= timeToWaitMs;
-                        boolean sendable = full || expired || exhausted || closed;
+                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                         if (sendable && !backingOff) {
                             readyNodes.add(leader);
                         } else {
@@ -266,7 +270,6 @@ public final class RecordAccumulator {
      * @param maxSize The maximum number of bytes to drain
      * @param now The current unix time in milliseconds
      * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize.
-     *         TODO: There may be a starvation issue due to iteration order
      */
     public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
         if (nodes.isEmpty())
@@ -324,8 +327,32 @@ public final class RecordAccumulator {
      * Deallocate the record batch
      */
     public void deallocate(RecordBatch batch) {
+        incomplete.remove(batch);
         free.deallocate(batch.records.buffer(), batch.records.capacity());
     }
+    
+    /**
+     * Are there any threads currently waiting on a flush?
+     */
+    private boolean flushInProgress() {
+        return flushesInProgress.get() > 0;
+    }
+    
+    /**
+     * Initiate the flushing of data from the accumulator...this makes all requests immediately ready
+     */
+    public void beginFlush() {
+        this.flushesInProgress.getAndIncrement();
+    }
+    
+    /**
+     * Mark all partitions as ready to send and block until the send is complete
+     */
+    public void awaitFlushCompletion() throws InterruptedException {
+        for (RecordBatch batch: this.incomplete.all())
+            batch.produceFuture.await();
+        this.flushesInProgress.decrementAndGet();
+    }
 
     /**
      * Close this accumulator and force all the record buffers to be drained
@@ -334,7 +361,9 @@ public final class RecordAccumulator {
         this.closed = true;
     }
 
-
+    /*
+     * Metadata about a record just appended to the record accumulator
+     */
     public final static class RecordAppendResult {
         public final FutureRecordMetadata future;
         public final boolean batchIsFull;
@@ -347,6 +376,9 @@ public final class RecordAccumulator {
         }
     }
 
+    /*
+     * The set of nodes that have at least one complete record batch in the accumulator
+     */
     public final static class ReadyCheckResult {
         public final Set<Node> readyNodes;
         public final long nextReadyCheckDelayMs;
@@ -358,4 +390,35 @@ public final class RecordAccumulator {
             this.unknownLeadersExist = unknownLeadersExist;
         }
     }
+    
+    /*
+     * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet
+     */
+    private final static class IncompleteRecordBatches {
+        private final Set<RecordBatch> incomplete;
+        
+        public IncompleteRecordBatches() {
+            this.incomplete = new HashSet<RecordBatch>();
+        }
+        
+        public void add(RecordBatch batch) {
+            synchronized (incomplete) {
+                this.incomplete.add(batch);
+            }
+        }
+        
+        public void remove(RecordBatch batch) {
+            synchronized (incomplete) {
+                boolean removed = this.incomplete.remove(batch);
+                if (!removed)
+                    throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
+            }
+        }
+        
+        public Iterable<RecordBatch> all() {
+            synchronized (incomplete) {
+                return new ArrayList<RecordBatch>(this.incomplete);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index dd0af8a..06182db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -16,6 +16,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
@@ -39,7 +40,7 @@ public final class RecordBatch {
     public long lastAttemptMs;
     public final MemoryRecords records;
     public final TopicPartition topicPartition;
-    private final ProduceRequestResult produceFuture;
+    public final ProduceRequestResult produceFuture;
     private final List<Thunk> thunks;
 
     public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
@@ -77,7 +78,6 @@ public final class RecordBatch {
      * @param exception The exception that occurred (or null if the request was successful)
      */
     public void done(long baseOffset, RuntimeException exception) {
-        this.produceFuture.done(topicPartition, baseOffset, exception);
         log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
                   topicPartition,
                   baseOffset,
@@ -86,14 +86,17 @@ public final class RecordBatch {
         for (int i = 0; i < this.thunks.size(); i++) {
             try {
                 Thunk thunk = this.thunks.get(i);
-                if (exception == null)
-                    thunk.callback.onCompletion(thunk.future.get(), null);
-                else
+                if (exception == null) {
+                    RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset());
+                    thunk.callback.onCompletion(metadata, null);
+                } else {
                     thunk.callback.onCompletion(null, exception);
+                }
             } catch (Exception e) {
                 log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);
             }
         }
+        this.produceFuture.done(topicPartition, baseOffset, exception);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
new file mode 100644
index 0000000..fee322f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * An unchecked wrapper for InterruptedException
+ */
+public class InterruptException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+    
+    public InterruptException(InterruptedException cause) {
+        super(cause);
+        Thread.currentThread().interrupt();
+    }
+    
+    public InterruptException(String message, InterruptedException cause) {
+        super(message, cause);
+        Thread.currentThread().interrupt();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
index d682bd4..18725de 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
@@ -36,7 +36,7 @@ public class SystemTime implements Time {
         try {
             Thread.sleep(ms);
         } catch (InterruptedException e) {
-            // no stress
+            // just wake up early
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
new file mode 100644
index 0000000..928087d
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MetadataTest {
+
+    private long refreshBackoffMs = 100;
+    private long metadataExpireMs = 1000;
+    private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
+    private AtomicBoolean backgroundError = new AtomicBoolean(false);
+    
+    @After
+    public void tearDown() {
+        assertFalse(backgroundError.get());
+    }
+
+    @Test
+    public void testMetadata() throws Exception {
+        long time = 0;
+        metadata.update(Cluster.empty(), time);
+        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+        metadata.requestUpdate();
+        assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
+        time += refreshBackoffMs;
+        assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0);
+        String topic = "my-topic";
+        Thread t1 = asyncFetch(topic);
+        Thread t2 = asyncFetch(topic);
+        assertTrue("Awaiting update", t1.isAlive());
+        assertTrue("Awaiting update", t2.isAlive());
+        metadata.update(TestUtils.singletonCluster(topic, 1), time);
+        t1.join();
+        t2.join();
+        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+        time += metadataExpireMs;
+        assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
+    }
+
+    /**
+     * Tests that {@link org.apache.kafka.clients.Metadata#awaitUpdate(int, long)} doesn't
+     * wait forever with a max timeout value of 0
+     *
+     * @throws Exception
+     * @see https://issues.apache.org/jira/browse/KAFKA-1836
+     */
+    @Test
+    public void testMetadataUpdateWaitTime() throws Exception {
+        long time = 0;
+        metadata.update(Cluster.empty(), time);
+        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+        // first try with a max wait time of 0 and ensure that this returns back without waiting forever
+        try {
+            metadata.awaitUpdate(metadata.requestUpdate(), 0);
+            fail("Wait on metadata update was expected to timeout, but it didn't");
+        } catch (TimeoutException te) {
+            // expected
+        }
+        // now try with a higher timeout value once
+        final long twoSecondWait = 2000;
+        try {
+            metadata.awaitUpdate(metadata.requestUpdate(), twoSecondWait);
+            fail("Wait on metadata update was expected to timeout, but it didn't");
+        } catch (TimeoutException te) {
+            // expected
+        }
+    }
+
+    private Thread asyncFetch(final String topic) {
+        Thread thread = new Thread() {
+            public void run() {
+                while (metadata.fetch().partitionsForTopic(topic) == null) {
+                    try {
+                        metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs);
+                    } catch (Exception e) {
+                        backgroundError.set(true);
+                    }
+                }
+            }
+        };
+        thread.start();
+        return thread;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
deleted file mode 100644
index 4ae43ed..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import org.apache.kafka.clients.producer.internals.BufferPool;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.*;
-
-public class BufferPoolTest {
-    private MockTime time = new MockTime();
-    private Metrics metrics = new Metrics(time);
-    String metricGroup = "TestMetrics";
-    Map<String, String> metricTags = new LinkedHashMap<String, String>();
-
-    /**
-     * Test the simple non-blocking allocation paths
-     */
-    @Test
-    public void testSimple() throws Exception {
-        long totalMemory = 64 * 1024;
-        int size = 1024;
-        BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags);
-        ByteBuffer buffer = pool.allocate(size);
-        assertEquals("Buffer size should equal requested size.", size, buffer.limit());
-        assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
-        assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory());
-        buffer.putInt(1);
-        buffer.flip();
-        pool.deallocate(buffer);
-        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
-        assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory());
-        buffer = pool.allocate(size);
-        assertEquals("Recycled buffer should be cleared.", 0, buffer.position());
-        assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit());
-        pool.deallocate(buffer);
-        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
-        assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory());
-        buffer = pool.allocate(2 * size);
-        pool.deallocate(buffer);
-        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
-        assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory());
-    }
-
-    /**
-     * Test that we cannot try to allocate more memory then we have in the whole pool
-     */
-    @Test(expected = IllegalArgumentException.class)
-    public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
-        BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags);
-        ByteBuffer buffer = pool.allocate(1024);
-        assertEquals(1024, buffer.limit());
-        pool.deallocate(buffer);
-        buffer = pool.allocate(1025);
-    }
-
-    @Test
-    public void testNonblockingMode() throws Exception {
-        BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags);
-        pool.allocate(1);
-        try {
-            pool.allocate(2);
-            fail("The buffer allocated more than it has!");
-        } catch (BufferExhaustedException e) {
-            // this is good
-        }
-    }
-
-    /**
-     * Test that delayed allocation blocks
-     */
-    @Test
-    public void testDelayedAllocation() throws Exception {
-        BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags);
-        ByteBuffer buffer = pool.allocate(1024);
-        CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
-        CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
-        assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount());
-        doDealloc.countDown(); // return the memory
-        allocation.await();
-    }
-
-    private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
-        final CountDownLatch latch = new CountDownLatch(1);
-        Thread thread = new Thread() {
-            public void run() {
-                try {
-                    latch.await();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-                pool.deallocate(buffer);
-            }
-        };
-        thread.start();
-        return latch;
-    }
-
-    private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
-        final CountDownLatch completed = new CountDownLatch(1);
-        Thread thread = new Thread() {
-            public void run() {
-                try {
-                    pool.allocate(size);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                } finally {
-                    completed.countDown();
-                }
-            }
-        };
-        thread.start();
-        return completed;
-    }
-
-    /**
-     * This test creates lots of threads that hammer on the pool
-     */
-    @Test
-    public void testStressfulSituation() throws Exception {
-        int numThreads = 10;
-        final int iterations = 50000;
-        final int poolableSize = 1024;
-        final int totalMemory = numThreads / 2 * poolableSize;
-        final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags);
-        List<StressTestThread> threads = new ArrayList<StressTestThread>();
-        for (int i = 0; i < numThreads; i++)
-            threads.add(new StressTestThread(pool, iterations));
-        for (StressTestThread thread : threads)
-            thread.start();
-        for (StressTestThread thread : threads)
-            thread.join();
-        for (StressTestThread thread : threads)
-            assertTrue("Thread should have completed all iterations successfully.", thread.success.get());
-        assertEquals(totalMemory, pool.availableMemory());
-    }
-
-    public static class StressTestThread extends Thread {
-        private final int iterations;
-        private final BufferPool pool;
-        public final AtomicBoolean success = new AtomicBoolean(false);
-
-        public StressTestThread(BufferPool pool, int iterations) {
-            this.iterations = iterations;
-            this.pool = pool;
-        }
-
-        public void run() {
-            try {
-                for (int i = 0; i < iterations; i++) {
-                    int size;
-                    if (TestUtils.RANDOM.nextBoolean())
-                        // allocate poolable size
-                        size = pool.poolableSize();
-                    else
-                        // allocate a random size
-                        size = TestUtils.RANDOM.nextInt((int) pool.totalMemory());
-                    ByteBuffer buffer = pool.allocate(size);
-                    pool.deallocate(buffer);
-                }
-                success.set(true);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
deleted file mode 100644
index 743aa7e..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class MetadataTest {
-
-    private long refreshBackoffMs = 100;
-    private long metadataExpireMs = 1000;
-    private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
-
-    @Test
-    public void testMetadata() throws Exception {
-        long time = 0;
-        metadata.update(Cluster.empty(), time);
-        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
-        metadata.requestUpdate();
-        assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
-        time += refreshBackoffMs;
-        assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0);
-        String topic = "my-topic";
-        Thread t1 = asyncFetch(topic);
-        Thread t2 = asyncFetch(topic);
-        assertTrue("Awaiting update", t1.isAlive());
-        assertTrue("Awaiting update", t2.isAlive());
-        metadata.update(TestUtils.singletonCluster(topic, 1), time);
-        t1.join();
-        t2.join();
-        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
-        time += metadataExpireMs;
-        assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
-    }
-
-    /**
-     * Tests that {@link org.apache.kafka.clients.Metadata#awaitUpdate(int, long)} doesn't
-     * wait forever with a max timeout value of 0
-     *
-     * @throws Exception
-     * @see https://issues.apache.org/jira/browse/KAFKA-1836
-     */
-    @Test
-    public void testMetadataUpdateWaitTime() throws Exception {
-        long time = 0;
-        metadata.update(Cluster.empty(), time);
-        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
-        // first try with a max wait time of 0 and ensure that this returns back without waiting forever
-        try {
-            metadata.awaitUpdate(metadata.requestUpdate(), 0);
-            fail("Wait on metadata update was expected to timeout, but it didn't");
-        } catch (TimeoutException te) {
-            // expected
-        }
-        // now try with a higher timeout value once
-        final long twoSecondWait = 2000;
-        try {
-            metadata.awaitUpdate(metadata.requestUpdate(), twoSecondWait);
-            fail("Wait on metadata update was expected to timeout, but it didn't");
-        } catch (TimeoutException te) {
-            // expected
-        }
-    }
-
-    private Thread asyncFetch(final String topic) {
-        Thread thread = new Thread() {
-            public void run() {
-                while (metadata.fetch().partitionsForTopic(topic) == null) {
-                    try {
-                        metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs);
-                    } catch (TimeoutException e) {
-                        // let it go
-                    }
-                }
-            }
-        };
-        thread.start();
-        return thread;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 75513b0..6372f1a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -67,6 +67,12 @@ public class MockProducerTest {
             assertEquals(e, err.getCause());
         }
         assertFalse("No more requests to complete", producer.completeNext());
+        
+        Future<RecordMetadata> md3 = producer.send(record1);
+        Future<RecordMetadata> md4 = producer.send(record2);
+        assertTrue("Requests should not be completed.", !md3.isDone() && !md4.isDone());
+        producer.flush();
+        assertTrue("Requests should be completed.", md3.isDone() && md4.isDone());
     }
 
     private boolean isError(Future<?> future) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
deleted file mode 100644
index 404bedb..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-import org.apache.kafka.clients.producer.internals.Partitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Test;
-
-public class PartitionerTest {
-
-    private byte[] key = "key".getBytes();
-    private Partitioner partitioner = new Partitioner();
-    private Node node0 = new Node(0, "localhost", 99);
-    private Node node1 = new Node(1, "localhost", 100);
-    private Node node2 = new Node(2, "localhost", 101);
-    private Node[] nodes = new Node[] {node0, node1, node2};
-    private String topic = "test";
-    // Intentionally make the partition list not in partition order to test the edge cases.
-    private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
-                                                    new PartitionInfo(topic, 2, node1, nodes, nodes),
-                                                    new PartitionInfo(topic, 0, node0, nodes, nodes));
-    private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
-
-    @Test
-    public void testUserSuppliedPartitioning() {
-        assertEquals("If the user supplies a partition we should use it.", 0, partitioner.partition("test", key, 0, cluster));
-    }
-
-    @Test
-    public void testKeyPartitionIsStable() {
-        int partition = partitioner.partition("test", key, null, cluster);
-        assertEquals("Same key should yield same partition", partition, partitioner.partition("test", key, null, cluster));
-    }
-
-    @Test
-    public void testRoundRobinWithUnavailablePartitions() {
-        // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition,
-        // and (2) the available partitions are selected in a round robin way.
-        int countForPart0 = 0;
-        int countForPart2 = 0;
-        for (int i = 1; i <= 100; i++) {
-            int part = partitioner.partition("test", null, null, cluster);
-            assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2);
-            if (part == 0)
-                countForPart0++;
-            else
-                countForPart2++;
-        }
-        assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
deleted file mode 100644
index 8333863..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kafka.clients.producer.internals.RecordAccumulator;
-import org.apache.kafka.clients.producer.internals.RecordBatch;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.LogEntry;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
-import org.apache.kafka.common.utils.MockTime;
-import org.junit.Test;
-
-public class RecordAccumulatorTest {
-
-    private String topic = "test";
-    private int partition1 = 0;
-    private int partition2 = 1;
-    private int partition3 = 2;
-    private Node node1 = new Node(0, "localhost", 1111);
-    private Node node2 = new Node(1, "localhost", 1112);
-    private TopicPartition tp1 = new TopicPartition(topic, partition1);
-    private TopicPartition tp2 = new TopicPartition(topic, partition2);
-    private TopicPartition tp3 = new TopicPartition(topic, partition3);
-    private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, null, null);
-    private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null);
-    private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null);
-    private MockTime time = new MockTime();
-    private byte[] key = "key".getBytes();
-    private byte[] value = "value".getBytes();
-    private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
-    private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3));
-    private Metrics metrics = new Metrics(time);
-    String metricGroup = "TestMetrics";
-    Map<String, String> metricTags = new LinkedHashMap<String, String>();
-
-    @Test
-    public void testFull() throws Exception {
-        long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time,  metricTags);
-        int appends = 1024 / msgSize;
-        for (int i = 0; i < appends; i++) {
-            accum.append(tp1, key, value, CompressionType.NONE, null);
-            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
-        }
-        accum.append(tp1, key, value, CompressionType.NONE, null);
-        assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
-        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
-        assertEquals(1, batches.size());
-        RecordBatch batch = batches.get(0);
-        Iterator<LogEntry> iter = batch.records.iterator();
-        for (int i = 0; i < appends; i++) {
-            LogEntry entry = iter.next();
-            assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
-            assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
-        }
-        assertFalse("No more records", iter.hasNext());
-    }
-
-    @Test
-    public void testAppendLarge() throws Exception {
-        int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags);
-        accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
-        assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
-    }
-
-    @Test
-    public void testLinger() throws Exception {
-        long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags);
-        accum.append(tp1, key, value, CompressionType.NONE, null);
-        assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
-        time.sleep(10);
-        assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
-        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
-        assertEquals(1, batches.size());
-        RecordBatch batch = batches.get(0);
-        Iterator<LogEntry> iter = batch.records.iterator();
-        LogEntry entry = iter.next();
-        assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
-        assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
-        assertFalse("No more records", iter.hasNext());
-    }
-
-    @Test
-    public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags);
-        int appends = 1024 / msgSize + 1;
-        List<TopicPartition> partitions = asList(tp1, tp2);
-        for (TopicPartition tp : partitions) {
-            for (int i = 0; i < appends; i++)
-                accum.append(tp, key, value, CompressionType.NONE, null);
-        }
-        assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
-
-        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id());
-        assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
-    }
-
-    @SuppressWarnings("unused")
-    @Test
-    public void testStressfulSituation() throws Exception {
-        final int numThreads = 5;
-        final int msgs = 10000;
-        final int numParts = 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags);
-        List<Thread> threads = new ArrayList<Thread>();
-        for (int i = 0; i < numThreads; i++) {
-            threads.add(new Thread() {
-                public void run() {
-                    for (int i = 0; i < msgs; i++) {
-                        try {
-                            accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null);
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            });
-        }
-        for (Thread t : threads)
-            t.start();
-        int read = 0;
-        long now = time.milliseconds();
-        while (read < numThreads * msgs) {
-            Set<Node> nodes = accum.ready(cluster, now).readyNodes;
-            List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
-            if (batches != null) {
-                for (RecordBatch batch : batches) {
-                    for (LogEntry entry : batch.records)
-                        read++;
-                    accum.deallocate(batch);
-                }
-            }
-        }
-
-        for (Thread t : threads)
-            t.join();
-    }
-
-
-    @Test
-    public void testNextReadyCheckDelay() throws Exception {
-        // Next check time will use lingerMs since this test won't trigger any retries/backoff
-        long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags);
-        // Just short of going over the limit so we trigger linger time
-        int appends = 1024 / msgSize;
-
-        // Partition on node1 only
-        for (int i = 0; i < appends; i++)
-            accum.append(tp1, key, value, CompressionType.NONE, null);
-        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
-        assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
-        assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
-
-        time.sleep(lingerMs / 2);
-
-        // Add partition on node2 only
-        for (int i = 0; i < appends; i++)
-            accum.append(tp3, key, value, CompressionType.NONE, null);
-        result = accum.ready(cluster, time.milliseconds());
-        assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
-        assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
-
-        // Add data for another partition on node1, enough to make data sendable immediately
-        for (int i = 0; i < appends + 1; i++)
-            accum.append(tp2, key, value, CompressionType.NONE, null);
-        result = accum.ready(cluster, time.milliseconds());
-        assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
-        // Note this can actually be < linger time because it may use delays from partitions that aren't sendable
-        // but have leaders with other sendable data.
-        assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
deleted file mode 100644
index 558942a..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.producer.internals.RecordAccumulator;
-import org.apache.kafka.clients.producer.internals.Sender;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-public class SenderTest {
-
-    private static final int MAX_REQUEST_SIZE = 1024 * 1024;
-    private static final short ACKS_ALL = -1;
-    private static final int MAX_RETRIES = 0;
-    private static final int REQUEST_TIMEOUT_MS = 10000;
-
-    private TopicPartition tp = new TopicPartition("test", 0);
-    private MockTime time = new MockTime();
-    private MockClient client = new MockClient(time);
-    private int batchSize = 16 * 1024;
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-    private Cluster cluster = TestUtils.singletonCluster("test", 1);
-    private Metrics metrics = new Metrics(time);
-    Map<String, String> metricTags = new LinkedHashMap<String, String>();
-    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time, metricTags);
-    private Sender sender = new Sender(client,
-                                       metadata,
-                                       this.accumulator,
-                                       MAX_REQUEST_SIZE,
-                                       ACKS_ALL,
-                                       MAX_RETRIES,
-                                       REQUEST_TIMEOUT_MS,
-                                       metrics,
-                                       time,
-                                       "clientId");
-
-    @Before
-    public void setup() {
-        metadata.update(cluster, time.milliseconds());
-    }
-
-    @Test
-    public void testSimple() throws Exception {
-        long offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
-        sender.run(time.milliseconds()); // connect
-        sender.run(time.milliseconds()); // send produce request
-        assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
-        client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
-        sender.run(time.milliseconds());
-        assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount());
-        sender.run(time.milliseconds());
-        assertTrue("Request should be completed", future.isDone());
-        assertEquals(offset, future.get().offset());
-    }
-
-    @Test
-    public void testRetries() throws Exception {
-        // create a sender with retries = 1
-        int maxRetries = 1;
-        Sender sender = new Sender(client,
-                                   metadata,
-                                   this.accumulator,
-                                   MAX_REQUEST_SIZE,
-                                   ACKS_ALL,
-                                   maxRetries,
-                                   REQUEST_TIMEOUT_MS,
-                                   new Metrics(),
-                                   time,
-                                   "clientId");
-        // do a successful retry
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
-        sender.run(time.milliseconds()); // connect
-        sender.run(time.milliseconds()); // send produce request
-        assertEquals(1, client.inFlightRequestCount());
-        client.disconnect(client.requests().peek().request().destination());
-        assertEquals(0, client.inFlightRequestCount());
-        sender.run(time.milliseconds()); // receive error
-        sender.run(time.milliseconds()); // reconnect
-        sender.run(time.milliseconds()); // resend
-        assertEquals(1, client.inFlightRequestCount());
-        long offset = 0;
-        client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
-        sender.run(time.milliseconds());
-        assertTrue("Request should have retried and completed", future.isDone());
-        assertEquals(offset, future.get().offset());
-
-        // do an unsuccessful retry
-        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
-        sender.run(time.milliseconds()); // send produce request
-        for (int i = 0; i < maxRetries + 1; i++) {
-            client.disconnect(client.requests().peek().request().destination());
-            sender.run(time.milliseconds()); // receive error
-            sender.run(time.milliseconds()); // reconnect
-            sender.run(time.milliseconds()); // resend
-        }
-        sender.run(time.milliseconds());
-        completedWithError(future, Errors.NETWORK_EXCEPTION);
-    }
-
-    private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
-        assertTrue("Request should be completed", future.isDone());
-        try {
-            future.get();
-            fail("Should have thrown an exception.");
-        } catch (ExecutionException e) {
-            assertEquals(error.exception().getClass(), e.getCause().getClass());
-        }
-    }
-
-    private Struct produceResponse(String topic, int part, long offset, int error) {
-        Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
-        Struct response = struct.instance("responses");
-        response.set("topic", topic);
-        Struct partResp = response.instance("partition_responses");
-        partResp.set("partition", part);
-        partResp.set("error_code", (short) error);
-        partResp.set("base_offset", offset);
-        response.set("partition_responses", new Object[] {partResp});
-        struct.set("responses", new Object[] {response});
-        return struct;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
new file mode 100644
index 0000000..2c69382
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.*;
+
+public class BufferPoolTest {
+    private MockTime time = new MockTime();
+    private Metrics metrics = new Metrics(time);
+    String metricGroup = "TestMetrics";
+    Map<String, String> metricTags = new LinkedHashMap<String, String>();
+
+    /**
+     * Test the simple non-blocking allocation paths
+     */
+    @Test
+    public void testSimple() throws Exception {
+        long totalMemory = 64 * 1024;
+        int size = 1024;
+        BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags);
+        ByteBuffer buffer = pool.allocate(size);
+        assertEquals("Buffer size should equal requested size.", size, buffer.limit());
+        assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
+        assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory());
+        buffer.putInt(1);
+        buffer.flip();
+        pool.deallocate(buffer);
+        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+        assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory());
+        buffer = pool.allocate(size);
+        assertEquals("Recycled buffer should be cleared.", 0, buffer.position());
+        assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit());
+        pool.deallocate(buffer);
+        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+        assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory());
+        buffer = pool.allocate(2 * size);
+        pool.deallocate(buffer);
+        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+        assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory());
+    }
+
+    /**
+     * Test that we cannot try to allocate more memory then we have in the whole pool
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
+        BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags);
+        ByteBuffer buffer = pool.allocate(1024);
+        assertEquals(1024, buffer.limit());
+        pool.deallocate(buffer);
+        buffer = pool.allocate(1025);
+    }
+
+    @Test
+    public void testNonblockingMode() throws Exception {
+        BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags);
+        pool.allocate(1);
+        try {
+            pool.allocate(2);
+            fail("The buffer allocated more than it has!");
+        } catch (BufferExhaustedException e) {
+            // this is good
+        }
+    }
+
+    /**
+     * Test that delayed allocation blocks
+     */
+    @Test
+    public void testDelayedAllocation() throws Exception {
+        BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags);
+        ByteBuffer buffer = pool.allocate(1024);
+        CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
+        CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
+        assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount());
+        doDealloc.countDown(); // return the memory
+        allocation.await();
+    }
+
+    private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        Thread thread = new Thread() {
+            public void run() {
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                pool.deallocate(buffer);
+            }
+        };
+        thread.start();
+        return latch;
+    }
+
+    private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
+        final CountDownLatch completed = new CountDownLatch(1);
+        Thread thread = new Thread() {
+            public void run() {
+                try {
+                    pool.allocate(size);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } finally {
+                    completed.countDown();
+                }
+            }
+        };
+        thread.start();
+        return completed;
+    }
+
+    /**
+     * This test creates lots of threads that hammer on the pool
+     */
+    @Test
+    public void testStressfulSituation() throws Exception {
+        int numThreads = 10;
+        final int iterations = 50000;
+        final int poolableSize = 1024;
+        final long totalMemory = numThreads / 2 * poolableSize;
+        final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags);
+        List<StressTestThread> threads = new ArrayList<StressTestThread>();
+        for (int i = 0; i < numThreads; i++)
+            threads.add(new StressTestThread(pool, iterations));
+        for (StressTestThread thread : threads)
+            thread.start();
+        for (StressTestThread thread : threads)
+            thread.join();
+        for (StressTestThread thread : threads)
+            assertTrue("Thread should have completed all iterations successfully.", thread.success.get());
+        assertEquals(totalMemory, pool.availableMemory());
+    }
+
+    public static class StressTestThread extends Thread {
+        private final int iterations;
+        private final BufferPool pool;
+        public final AtomicBoolean success = new AtomicBoolean(false);
+
+        public StressTestThread(BufferPool pool, int iterations) {
+            this.iterations = iterations;
+            this.pool = pool;
+        }
+
+        public void run() {
+            try {
+                for (int i = 0; i < iterations; i++) {
+                    int size;
+                    if (TestUtils.RANDOM.nextBoolean())
+                        // allocate poolable size
+                        size = pool.poolableSize();
+                    else
+                        // allocate a random size
+                        size = TestUtils.RANDOM.nextInt((int) pool.totalMemory());
+                    ByteBuffer buffer = pool.allocate(size);
+                    pool.deallocate(buffer);
+                }
+                success.set(true);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+}