You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/05/13 00:41:32 UTC

[3/3] kafka git commit: KAFKA-1660; Add API to the producer to support close with a timeout; reviewed by Joel Koshy and Jay Kreps.

KAFKA-1660; Add API to the producer to support close with a timeout; reviewed by Joel Koshy and Jay Kreps.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/33af0cba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/33af0cba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/33af0cba

Branch: refs/heads/trunk
Commit: 33af0cba3bff87874ae6cef61900cd065edad064
Parents: 2d5e0f0
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue May 12 15:31:07 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue May 12 15:31:07 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  84 ++++++++++---
 .../kafka/clients/producer/MockProducer.java    |   5 +
 .../apache/kafka/clients/producer/Producer.java |   7 ++
 .../producer/internals/RecordAccumulator.java   | 120 +++++++++++++-----
 .../clients/producer/internals/Sender.java      |  19 ++-
 .../kafka/common/errors/InterruptException.java |   5 +
 .../kafka/common/serialization/Serializer.java  |   6 +
 .../internals/RecordAccumulatorTest.java        |  26 ++++
 .../kafka/api/ProducerSendTest.scala            | 124 +++++++++++++++++--
 9 files changed, 332 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 42b1292..8e336a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -265,13 +265,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             } else {
                 this.valueSerializer = valueSerializer;
             }
-
             config.logUnused();
             log.debug("Kafka producer started");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed
             // this is to prevent resource leak. see KAFKA-2121
-            close(true);
+            close(0, TimeUnit.MILLISECONDS, true);
             // now propagate the exception
             throw new KafkaException("Failed to construct kafka producer", t);
         }
@@ -518,40 +517,87 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
     /**
      * Close this producer. This method blocks until all previously sent requests complete.
+     * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
+     * <p>
+     * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
+     * will be called instead. We do this because the sender thread would otherwise try to join itself and
+     * block forever.</strong>
+     * <p/>
      * @throws InterruptException If the thread is interrupted while blocked
      */
     @Override
     public void close() {
-        close(false);
+        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * This method waits up to <code>timeout</code> for the producer to complete the sending of all incomplete requests.
+     * <p>
+     * If the producer is unable to complete all requests before the timeout expires, this method will fail
+     * any unsent and unacknowledged records immediately.
+     * <p>
+     * If invoked from within a {@link Callback} this method will not block and will be equivalent to
+     * <code>close(0, TimeUnit.MILLISECONDS)</code>. This is done since no further sending will happen while
+     * blocking the I/O thread of the producer.
+     *
+     * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be
+     *                non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.
+     * @param timeUnit The time unit for the <code>timeout</code>
+     * @throws InterruptException If the thread is interrupted while blocked
+     * @throws IllegalArgumentException If the <code>timeout</code> is negative.
+     */
+    @Override
+    public void close(long timeout, TimeUnit timeUnit) {
+        close(timeout, timeUnit, false);
     }
+    
+    private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {
+        if (timeout < 0)
+            throw new IllegalArgumentException("The timeout cannot be negative.");
 
-    private void close(boolean swallowException) {
-        log.trace("Closing the Kafka producer.");
+        log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
         // this will keep track of the first encountered exception
         AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
-        if (this.sender != null) {
-            try {
-                this.sender.initiateClose();
-            } catch (Throwable t) {
-                firstException.compareAndSet(null, t);
-                log.error("Failed to close sender", t);
+        boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
+        if (timeout > 0) {
+            if (invokedFromCallback) {
+                log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " +
+                    "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
+            } else {
+                // Try to close gracefully.
+                if (this.sender != null)
+                    this.sender.initiateClose();
+                if (this.ioThread != null) {
+                    try {
+                        this.ioThread.join(timeUnit.toMillis(timeout));
+                    } catch (InterruptedException t) {
+                        firstException.compareAndSet(null, t);
+                        log.error("Interrupted while joining ioThread", t);
+                    }
+                }
             }
         }
-        if (this.ioThread != null) {
-            try {
-                this.ioThread.join();
-            } catch (InterruptedException t) {
-                firstException.compareAndSet(null, t);
-                log.error("Interrupted while joining ioThread", t);
+
+        if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
+            log.info("Proceeding to force close the producer since pending requests could not be completed " +
+                "within timeout {} ms.", timeout);
+            this.sender.forceClose();
+            // Only join the sender thread when not calling from callback.
+            if (!invokedFromCallback) {
+                try {
+                    this.ioThread.join();
+                } catch (InterruptedException e) {
+                    firstException.compareAndSet(null, e);
+                }
             }
         }
+
         ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
         ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
         ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
         log.debug("The Kafka producer has closed.");
-        if (firstException.get() != null && !swallowException) {
+        if (firstException.get() != null && !swallowException)
             throw new KafkaException("Failed to close kafka producer", firstException.get());
-        }
     }
 
     private static class FutureFailure implements Future<RecordMetadata> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 6913090..3c34610 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.Partitioner;
@@ -146,6 +147,10 @@ public class MockProducer implements Producer<byte[], byte[]> {
     public void close() {
     }
 
+    @Override
+    public void close(long timeout, TimeUnit timeUnit) {
+    }
+
     /**
      * Get the list of sent records since the last call to {@link #clear()}
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 5b3e75e..d4a5d39 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.PartitionInfo;
@@ -67,4 +68,10 @@ public interface Producer<K, V> extends Closeable {
      */
     public void close();
 
+    /**
+     * Tries to close the producer cleanly within the specified timeout. If the close does not complete within the
+     * timeout, fail any pending send requests and force close the producer.
+     */
+    public void close(long timeout, TimeUnit unit);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 49a9883..87dbd64 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -56,8 +56,9 @@ public final class RecordAccumulator {
     private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
 
     private volatile boolean closed;
-    private volatile AtomicInteger flushesInProgress;
     private int drainIndex;
+    private final AtomicInteger flushesInProgress;
+    private final AtomicInteger appendsInProgress;
     private final int batchSize;
     private final CompressionType compression;
     private final long lingerMs;
@@ -67,6 +68,7 @@ public final class RecordAccumulator {
     private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
     private final IncompleteRecordBatches incomplete;
 
+
     /**
      * Create a new record accumulator
      * 
@@ -96,6 +98,7 @@ public final class RecordAccumulator {
         this.drainIndex = 0;
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);
+        this.appendsInProgress = new AtomicInteger(0);
         this.batchSize = batchSize;
         this.compression = compression;
         this.lingerMs = lingerMs;
@@ -146,40 +149,50 @@ public final class RecordAccumulator {
      * @param callback The user-supplied callback to execute when the request is complete
      */
     public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException {
-        if (closed)
-            throw new IllegalStateException("Cannot send after the producer is closed.");
-        // check if we have an in-progress batch
-        Deque<RecordBatch> dq = dequeFor(tp);
-        synchronized (dq) {
-            RecordBatch last = dq.peekLast();
-            if (last != null) {
-                FutureRecordMetadata future = last.tryAppend(key, value, callback);
-                if (future != null)
-                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
+        // We keep track of the number of appending thread to make sure we do not miss batches in
+        // abortIncompleteBatches().
+        appendsInProgress.incrementAndGet();
+        try {
+            if (closed)
+                throw new IllegalStateException("Cannot send after the producer is closed.");
+            // check if we have an in-progress batch
+            Deque<RecordBatch> dq = dequeFor(tp);
+            synchronized (dq) {
+                RecordBatch last = dq.peekLast();
+                if (last != null) {
+                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
+                    if (future != null)
+                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
+                }
             }
-        }
 
-        // we don't have an in-progress record batch try to allocate a new batch
-        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
-        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
-        ByteBuffer buffer = free.allocate(size);
-        synchronized (dq) {
-            RecordBatch last = dq.peekLast();
-            if (last != null) {
-                FutureRecordMetadata future = last.tryAppend(key, value, callback);
-                if (future != null) {
-                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
-                    free.deallocate(buffer);
-                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
+            // we don't have an in-progress record batch try to allocate a new batch
+            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
+            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
+            ByteBuffer buffer = free.allocate(size);
+            synchronized (dq) {
+                // Need to check if producer is closed again after grabbing the dequeue lock.
+                if (closed)
+                    throw new IllegalStateException("Cannot send after the producer is closed.");
+                RecordBatch last = dq.peekLast();
+                if (last != null) {
+                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
+                    if (future != null) {
+                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
+                        free.deallocate(buffer);
+                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
+                    }
                 }
-            }
-            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
-            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
-            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
+                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
+                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
+                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
 
-            dq.addLast(batch);
-            incomplete.add(batch);
-            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
+                dq.addLast(batch);
+                incomplete.add(batch);
+                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
+            }
+        } finally {
+            appendsInProgress.decrementAndGet();
         }
     }
 
@@ -351,7 +364,14 @@ public final class RecordAccumulator {
     public void beginFlush() {
         this.flushesInProgress.getAndIncrement();
     }
-    
+
+    /**
+     * Are there any threads currently appending messages?
+     */
+    private boolean appendsInProgress() {
+        return appendsInProgress.get() > 0;
+    }
+
     /**
      * Mark all partitions as ready to send and block until the send is complete
      */
@@ -362,6 +382,40 @@ public final class RecordAccumulator {
     }
 
     /**
+     * This function is only called when sender is closed forcefully. It will fail all the
+     * incomplete batches and return.
+     */
+    public void abortIncompleteBatches() {
+        // We need to keep aborting the incomplete batch until no thread is trying to append to
+        // 1. Avoid losing batches.
+        // 2. Free up memory in case appending threads are blocked on buffer full.
+        // This is a tight loop but should be able to get through very quickly.
+        do {
+            abortBatches();
+        } while (appendsInProgress());
+        // After this point, no thread will append any messages because they will see the close
+        // flag set. We need to do the last abort after no thread was appending in case the there was a new
+        // batch appended by the last appending thread.
+        abortBatches();
+        this.batches.clear();
+    }
+
+    /**
+     * Go through incomplete batches and abort them.
+     */
+    private void abortBatches() {
+        for (RecordBatch batch : incomplete.all()) {
+            Deque<RecordBatch> dq = dequeFor(batch.topicPartition);
+            // Close the batch before aborting
+            synchronized (dq) {
+                batch.records.close();
+            }
+            batch.done(-1L, new IllegalStateException("Producer is closed forcefully."));
+            deallocate(batch);
+        }
+    }
+
+    /**
      * Close this accumulator and force all the record buffers to be drained
      */
     public void close() {
@@ -403,7 +457,7 @@ public final class RecordAccumulator {
      */
     private final static class IncompleteRecordBatches {
         private final Set<RecordBatch> incomplete;
-        
+
         public IncompleteRecordBatches() {
             this.incomplete = new HashSet<RecordBatch>();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index b2db91c..1e943d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -83,6 +83,9 @@ public class Sender implements Runnable {
     /* true while the sender thread is still running */
     private volatile boolean running;
 
+    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
+    private volatile boolean forceClose;
+
     /* metrics */
     private final SenderMetrics sensors;
 
@@ -132,13 +135,18 @@ public class Sender implements Runnable {
         // okay we stopped accepting requests but there may still be
         // requests in the accumulator or waiting for acknowledgment,
         // wait until these are completed.
-        while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {
+        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
             try {
                 run(time.milliseconds());
             } catch (Exception e) {
                 log.error("Uncaught error in kafka producer I/O thread: ", e);
             }
         }
+        if (forceClose) {
+            // We need to fail all the incomplete batches and wake up the threads waiting on
+            // the futures.
+            this.accumulator.abortIncompleteBatches();
+        }
         try {
             this.client.close();
         } catch (Exception e) {
@@ -181,7 +189,6 @@ public class Sender implements Runnable {
                                                                          now);
         sensors.updateProduceRequestMetrics(batches);
         List<ClientRequest> requests = createProduceRequests(batches, now);
-
         // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
         // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
         // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
@@ -212,6 +219,14 @@ public class Sender implements Runnable {
     }
 
     /**
+     * Closes the sender without sending out any pending messages.
+     */
+    public void forceClose() {
+        this.forceClose = true;
+        initiateClose();
+    }
+
+    /**
      * Handle a produce response
      */
     private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
index fee322f..3680f1b 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
@@ -31,4 +31,9 @@ public class InterruptException extends KafkaException {
         Thread.currentThread().interrupt();
     }
 
+    public InterruptException(String message) {
+        super(message, new InterruptedException());
+        Thread.currentThread().interrupt();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index 16a67a2..88033b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -38,6 +38,12 @@ public interface Serializer<T> extends Closeable {
      */
     public byte[] serialize(String topic, T data);
 
+
+    /**
+     * Close this serializer.
+     * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called
+     * multiple times.
+     */
     @Override
     public void close();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index baa48e7..5b2e4ff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -26,7 +26,10 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -265,4 +268,27 @@ public class RecordAccumulatorTest {
         assertFalse(accum.hasUnsent());
     }
 
+    @Test
+    public void testAbortIncompleteBatches() throws Exception {
+        long lingerMs = Long.MAX_VALUE;
+        final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
+        final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags);
+        class TestCallback implements Callback {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                assertTrue(exception.getMessage().equals("Producer is closed forcefully."));
+                numExceptionReceivedInCallback.incrementAndGet();
+            }
+        }
+        for (int i = 0; i < 100; i++)
+            accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback());
+        RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+        assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+
+        accum.abortIncompleteBatches();
+        assertEquals(numExceptionReceivedInCallback.get(), 100);
+        assertFalse(accum.hasUnsent());
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 9811a2b..9ce4bd5 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -17,20 +17,21 @@
 
 package kafka.api
 
-import org.apache.kafka.clients.producer._
-import org.scalatest.junit.JUnit3Suite
-import org.junit.Test
-import org.junit.Assert._
+import java.util.Properties
+import java.util.concurrent.TimeUnit
 
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
 import kafka.consumer.SimpleConsumer
-import kafka.message.Message
 import kafka.integration.KafkaServerTestHarness
-import org.apache.kafka.common.errors.SerializationException
-import java.util.Properties
+import kafka.message.Message
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.errors.SerializationException
 import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
 
 
 class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -318,6 +319,109 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
         producer.close()
     }
   }
-  
 
+  /**
+   * Test close with zero timeout from caller thread
+   */
+  @Test
+  def testCloseWithZeroTimeoutFromCallerThread() {
+    var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
+    try {
+      // create topic
+      val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      val leader0 = leaders(0)
+      val leader1 = leaders(1)
+
+      // create record
+      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
+      val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes)
+
+      // Test closing from caller thread.
+      for(i <- 0 until 50) {
+        producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue)
+        val responses = (0 until numRecords) map (i => producer.send(record0))
+        assertTrue("No request is complete.", responses.forall(!_.isDone()))
+        producer.close(0, TimeUnit.MILLISECONDS)
+        responses.foreach { future =>
+          try {
+            future.get()
+            fail("No message should be sent successfully.")
+          } catch {
+            case e: Exception =>
+              assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage)
+          }
+        }
+        val fetchResponse = if (leader0.get == configs(0).brokerId) {
+          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+        } else {
+          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+        }
+        assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size)
+      }
+    } finally {
+      if (producer != null)
+        producer.close()
+    }
+  }
+
+  /**
+   * Test close with zero and non-zero timeout from sender thread
+   */
+  @Test
+  def testCloseWithZeroTimeoutFromSenderThread() {
+    var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
+    try {
+      // create topic
+      val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      val leader0 = leaders(0)
+      val leader1 = leaders(1)
+
+      // create record
+      val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
+      val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes)
+
+      // Test closing from sender thread.
+      class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback {
+        override def onCompletion(metadata: RecordMetadata, exception: Exception) {
+          // Trigger another batch in accumulator before close the producer. These messages should
+          // not be sent.
+          (0 until numRecords) map (i => producer.send(record1))
+          // The close call will be called by all the message callbacks. This tests idempotence of the close call.
+          producer.close(0, TimeUnit.MILLISECONDS)
+          // Test close with non zero timeout. Should not block at all.
+          producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
+        }
+      }
+      for(i <- 0 until 50) {
+        producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue)
+        // send message to partition 0
+        var responses = (0 until numRecords) map (i => producer.send(record0))
+        // send message to partition 1
+        responses ++= ((0 until numRecords) map (i => producer.send(record1, new CloseCallback(producer))))
+        assertTrue("No request is complete.", responses.forall(!_.isDone()))
+        // flush the messages.
+        producer.flush()
+        assertTrue("All request are complete.", responses.forall(_.isDone()))
+        // Check the messages received by broker.
+        val fetchResponse0 = if (leader0.get == configs(0).brokerId) {
+          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+        } else {
+          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+        }
+        val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
+          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
+        } else {
+          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
+        }
+        val expectedNumRecords = (i + 1) * numRecords
+        assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
+          expectedNumRecords, fetchResponse0.messageSet(topic, 0).size)
+        assertEquals("Fetch response to partition 1 should have %d messages.".format(expectedNumRecords),
+          expectedNumRecords, fetchResponse1.messageSet(topic, 1).size)
+      }
+    } finally {
+      if (producer != null)
+        producer.close()
+    }
+  }
 }