You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/05/13 00:41:32 UTC
[3/3] kafka git commit: KAFKA-1660;
Add API to the producer to support close with a timeout;
reviewed by Joel Koshy and Jay Kreps.
KAFKA-1660; Add API to the producer to support close with a timeout; reviewed by Joel Koshy and Jay Kreps.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/33af0cba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/33af0cba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/33af0cba
Branch: refs/heads/trunk
Commit: 33af0cba3bff87874ae6cef61900cd065edad064
Parents: 2d5e0f0
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue May 12 15:31:07 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue May 12 15:31:07 2015 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 84 ++++++++++---
.../kafka/clients/producer/MockProducer.java | 5 +
.../apache/kafka/clients/producer/Producer.java | 7 ++
.../producer/internals/RecordAccumulator.java | 120 +++++++++++++-----
.../clients/producer/internals/Sender.java | 19 ++-
.../kafka/common/errors/InterruptException.java | 5 +
.../kafka/common/serialization/Serializer.java | 6 +
.../internals/RecordAccumulatorTest.java | 26 ++++
.../kafka/api/ProducerSendTest.scala | 124 +++++++++++++++++--
9 files changed, 332 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 42b1292..8e336a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -265,13 +265,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
} else {
this.valueSerializer = valueSerializer;
}
-
config.logUnused();
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
- close(true);
+ close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
@@ -518,40 +517,87 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
/**
* Close this producer. This method blocks until all previously sent requests complete.
+ * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
+ * <p>
+ * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
+ * will be called instead. We do this because the sender thread would otherwise try to join itself and
+ * block forever.</strong>
+ * <p/>
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
public void close() {
- close(false);
+ close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * This method waits up to <code>timeout</code> for the producer to complete the sending of all incomplete requests.
+ * <p>
+ * If the producer is unable to complete all requests before the timeout expires, this method will fail
+ * any unsent and unacknowledged records immediately.
+ * <p>
+ * If invoked from within a {@link Callback} this method will not block and will be equivalent to
+ * <code>close(0, TimeUnit.MILLISECONDS)</code>. This is done since no further sending will happen while
+ * blocking the I/O thread of the producer.
+ *
+ * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be
+ * non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.
+ * @param timeUnit The time unit for the <code>timeout</code>
+ * @throws InterruptException If the thread is interrupted while blocked
+ * @throws IllegalArgumentException If the <code>timeout</code> is negative.
+ */
+ @Override
+ public void close(long timeout, TimeUnit timeUnit) {
+ close(timeout, timeUnit, false);
}
+
+ private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {
+ if (timeout < 0)
+ throw new IllegalArgumentException("The timeout cannot be negative.");
- private void close(boolean swallowException) {
- log.trace("Closing the Kafka producer.");
+ log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
// this will keep track of the first encountered exception
AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
- if (this.sender != null) {
- try {
- this.sender.initiateClose();
- } catch (Throwable t) {
- firstException.compareAndSet(null, t);
- log.error("Failed to close sender", t);
+ boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
+ if (timeout > 0) {
+ if (invokedFromCallback) {
+ log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " +
+ "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
+ } else {
+ // Try to close gracefully.
+ if (this.sender != null)
+ this.sender.initiateClose();
+ if (this.ioThread != null) {
+ try {
+ this.ioThread.join(timeUnit.toMillis(timeout));
+ } catch (InterruptedException t) {
+ firstException.compareAndSet(null, t);
+ log.error("Interrupted while joining ioThread", t);
+ }
+ }
}
}
- if (this.ioThread != null) {
- try {
- this.ioThread.join();
- } catch (InterruptedException t) {
- firstException.compareAndSet(null, t);
- log.error("Interrupted while joining ioThread", t);
+
+ if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
+ log.info("Proceeding to force close the producer since pending requests could not be completed " +
+ "within timeout {} ms.", timeout);
+ this.sender.forceClose();
+ // Only join the sender thread when not calling from callback.
+ if (!invokedFromCallback) {
+ try {
+ this.ioThread.join();
+ } catch (InterruptedException e) {
+ firstException.compareAndSet(null, e);
+ }
}
}
+
ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
log.debug("The Kafka producer has closed.");
- if (firstException.get() != null && !swallowException) {
+ if (firstException.get() != null && !swallowException)
throw new KafkaException("Failed to close kafka producer", firstException.get());
- }
}
private static class FutureFailure implements Future<RecordMetadata> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 6913090..3c34610 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.Partitioner;
@@ -146,6 +147,10 @@ public class MockProducer implements Producer<byte[], byte[]> {
public void close() {
}
+ @Override
+ public void close(long timeout, TimeUnit timeUnit) {
+ }
+
/**
* Get the list of sent records since the last call to {@link #clear()}
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 5b3e75e..d4a5d39 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;
@@ -67,4 +68,10 @@ public interface Producer<K, V> extends Closeable {
*/
public void close();
+ /**
+ * Tries to close the producer cleanly within the specified timeout. If the close does not complete within the
+ * timeout, fail any pending send requests and force close the producer.
+ */
+ public void close(long timeout, TimeUnit unit);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 49a9883..87dbd64 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -56,8 +56,9 @@ public final class RecordAccumulator {
private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
private volatile boolean closed;
- private volatile AtomicInteger flushesInProgress;
private int drainIndex;
+ private final AtomicInteger flushesInProgress;
+ private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final long lingerMs;
@@ -67,6 +68,7 @@ public final class RecordAccumulator {
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
private final IncompleteRecordBatches incomplete;
+
/**
* Create a new record accumulator
*
@@ -96,6 +98,7 @@ public final class RecordAccumulator {
this.drainIndex = 0;
this.closed = false;
this.flushesInProgress = new AtomicInteger(0);
+ this.appendsInProgress = new AtomicInteger(0);
this.batchSize = batchSize;
this.compression = compression;
this.lingerMs = lingerMs;
@@ -146,40 +149,50 @@ public final class RecordAccumulator {
* @param callback The user-supplied callback to execute when the request is complete
*/
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException {
- if (closed)
- throw new IllegalStateException("Cannot send after the producer is closed.");
- // check if we have an in-progress batch
- Deque<RecordBatch> dq = dequeFor(tp);
- synchronized (dq) {
- RecordBatch last = dq.peekLast();
- if (last != null) {
- FutureRecordMetadata future = last.tryAppend(key, value, callback);
- if (future != null)
- return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
+ // We keep track of the number of appending thread to make sure we do not miss batches in
+ // abortIncompleteBatches().
+ appendsInProgress.incrementAndGet();
+ try {
+ if (closed)
+ throw new IllegalStateException("Cannot send after the producer is closed.");
+ // check if we have an in-progress batch
+ Deque<RecordBatch> dq = dequeFor(tp);
+ synchronized (dq) {
+ RecordBatch last = dq.peekLast();
+ if (last != null) {
+ FutureRecordMetadata future = last.tryAppend(key, value, callback);
+ if (future != null)
+ return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
+ }
}
- }
- // we don't have an in-progress record batch try to allocate a new batch
- int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
- log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
- ByteBuffer buffer = free.allocate(size);
- synchronized (dq) {
- RecordBatch last = dq.peekLast();
- if (last != null) {
- FutureRecordMetadata future = last.tryAppend(key, value, callback);
- if (future != null) {
- // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
- free.deallocate(buffer);
- return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
+ // we don't have an in-progress record batch try to allocate a new batch
+ int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
+ log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
+ ByteBuffer buffer = free.allocate(size);
+ synchronized (dq) {
+ // Need to check if producer is closed again after grabbing the dequeue lock.
+ if (closed)
+ throw new IllegalStateException("Cannot send after the producer is closed.");
+ RecordBatch last = dq.peekLast();
+ if (last != null) {
+ FutureRecordMetadata future = last.tryAppend(key, value, callback);
+ if (future != null) {
+ // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
+ free.deallocate(buffer);
+ return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
+ }
}
- }
- MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
- RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
- FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
+ MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
+ RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
+ FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
- dq.addLast(batch);
- incomplete.add(batch);
- return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
+ dq.addLast(batch);
+ incomplete.add(batch);
+ return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
+ }
+ } finally {
+ appendsInProgress.decrementAndGet();
}
}
@@ -351,7 +364,14 @@ public final class RecordAccumulator {
public void beginFlush() {
this.flushesInProgress.getAndIncrement();
}
-
+
+ /**
+ * Are there any threads currently appending messages?
+ */
+ private boolean appendsInProgress() {
+ return appendsInProgress.get() > 0;
+ }
+
/**
* Mark all partitions as ready to send and block until the send is complete
*/
@@ -362,6 +382,40 @@ public final class RecordAccumulator {
}
/**
+ * This function is only called when sender is closed forcefully. It will fail all the
+ * incomplete batches and return.
+ */
+ public void abortIncompleteBatches() {
+ // We need to keep aborting the incomplete batch until no thread is trying to append to
+ // 1. Avoid losing batches.
+ // 2. Free up memory in case appending threads are blocked on buffer full.
+ // This is a tight loop but should be able to get through very quickly.
+ do {
+ abortBatches();
+ } while (appendsInProgress());
+ // After this point, no thread will append any messages because they will see the close
+ // flag set. We need to do the last abort after no thread was appending in case the there was a new
+ // batch appended by the last appending thread.
+ abortBatches();
+ this.batches.clear();
+ }
+
+ /**
+ * Go through incomplete batches and abort them.
+ */
+ private void abortBatches() {
+ for (RecordBatch batch : incomplete.all()) {
+ Deque<RecordBatch> dq = dequeFor(batch.topicPartition);
+ // Close the batch before aborting
+ synchronized (dq) {
+ batch.records.close();
+ }
+ batch.done(-1L, new IllegalStateException("Producer is closed forcefully."));
+ deallocate(batch);
+ }
+ }
+
+ /**
* Close this accumulator and force all the record buffers to be drained
*/
public void close() {
@@ -403,7 +457,7 @@ public final class RecordAccumulator {
*/
private final static class IncompleteRecordBatches {
private final Set<RecordBatch> incomplete;
-
+
public IncompleteRecordBatches() {
this.incomplete = new HashSet<RecordBatch>();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index b2db91c..1e943d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -83,6 +83,9 @@ public class Sender implements Runnable {
/* true while the sender thread is still running */
private volatile boolean running;
+ /* true when the caller wants to ignore all unsent/inflight messages and force close. */
+ private volatile boolean forceClose;
+
/* metrics */
private final SenderMetrics sensors;
@@ -132,13 +135,18 @@ public class Sender implements Runnable {
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
- while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {
+ while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
+ if (forceClose) {
+ // We need to fail all the incomplete batches and wake up the threads waiting on
+ // the futures.
+ this.accumulator.abortIncompleteBatches();
+ }
try {
this.client.close();
} catch (Exception e) {
@@ -181,7 +189,6 @@ public class Sender implements Runnable {
now);
sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
-
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
@@ -212,6 +219,14 @@ public class Sender implements Runnable {
}
/**
+ * Closes the sender without sending out any pending messages.
+ */
+ public void forceClose() {
+ this.forceClose = true;
+ initiateClose();
+ }
+
+ /**
* Handle a produce response
*/
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
index fee322f..3680f1b 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
@@ -31,4 +31,9 @@ public class InterruptException extends KafkaException {
Thread.currentThread().interrupt();
}
+ public InterruptException(String message) {
+ super(message, new InterruptedException());
+ Thread.currentThread().interrupt();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index 16a67a2..88033b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -38,6 +38,12 @@ public interface Serializer<T> extends Closeable {
*/
public byte[] serialize(String topic, T data);
+
+ /**
+ * Close this serializer.
+ * This method has to be idempotent if the serializer is used in KafkaProducer because it might be called
+ * multiple times.
+ */
@Override
public void close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index baa48e7..5b2e4ff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -26,7 +26,10 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -265,4 +268,27 @@ public class RecordAccumulatorTest {
assertFalse(accum.hasUnsent());
}
+ @Test
+ public void testAbortIncompleteBatches() throws Exception {
+ long lingerMs = Long.MAX_VALUE;
+ final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
+ final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags);
+ class TestCallback implements Callback {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ assertTrue(exception.getMessage().equals("Producer is closed forcefully."));
+ numExceptionReceivedInCallback.incrementAndGet();
+ }
+ }
+ for (int i = 0; i < 100; i++)
+ accum.append(new TopicPartition(topic, i % 3), key, value, new TestCallback());
+ RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+ assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+
+ accum.abortIncompleteBatches();
+ assertEquals(numExceptionReceivedInCallback.get(), 100);
+ assertFalse(accum.hasUnsent());
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/33af0cba/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 9811a2b..9ce4bd5 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -17,20 +17,21 @@
package kafka.api
-import org.apache.kafka.clients.producer._
-import org.scalatest.junit.JUnit3Suite
-import org.junit.Test
-import org.junit.Assert._
+import java.util.Properties
+import java.util.concurrent.TimeUnit
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
import kafka.consumer.SimpleConsumer
-import kafka.message.Message
import kafka.integration.KafkaServerTestHarness
-import org.apache.kafka.common.errors.SerializationException
-import java.util.Properties
+import kafka.message.Message
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer._
import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -318,6 +319,109 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
producer.close()
}
}
-
+ /**
+ * Test close with zero timeout from caller thread
+ */
+ @Test
+ def testCloseWithZeroTimeoutFromCallerThread() {
+ var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
+ try {
+ // create topic
+ val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+ val leader0 = leaders(0)
+ val leader1 = leaders(1)
+
+ // create record
+ val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
+ val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes)
+
+ // Test closing from caller thread.
+ for(i <- 0 until 50) {
+ producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue)
+ val responses = (0 until numRecords) map (i => producer.send(record0))
+ assertTrue("No request is complete.", responses.forall(!_.isDone()))
+ producer.close(0, TimeUnit.MILLISECONDS)
+ responses.foreach { future =>
+ try {
+ future.get()
+ fail("No message should be sent successfully.")
+ } catch {
+ case e: Exception =>
+ assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.", e.getMessage)
+ }
+ }
+ val fetchResponse = if (leader0.get == configs(0).brokerId) {
+ consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ } else {
+ consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ }
+ assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic, 0).size)
+ }
+ } finally {
+ if (producer != null)
+ producer.close()
+ }
+ }
+
+ /**
+ * Test close with zero and non-zero timeout from sender thread
+ */
+ @Test
+ def testCloseWithZeroTimeoutFromSenderThread() {
+ var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
+ try {
+ // create topic
+ val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+ val leader0 = leaders(0)
+ val leader1 = leaders(1)
+
+ // create record
+ val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
+ val record1 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 1, null, "value".getBytes)
+
+ // Test closing from sender thread.
+ class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback {
+ override def onCompletion(metadata: RecordMetadata, exception: Exception) {
+ // Trigger another batch in accumulator before close the producer. These messages should
+ // not be sent.
+ (0 until numRecords) map (i => producer.send(record1))
+ // The close call will be called by all the message callbacks. This tests idempotence of the close call.
+ producer.close(0, TimeUnit.MILLISECONDS)
+ // Test close with non zero timeout. Should not block at all.
+ producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
+ }
+ }
+ for(i <- 0 until 50) {
+ producer = TestUtils.createNewProducer(brokerList, lingerMs = Long.MaxValue)
+ // send message to partition 0
+ var responses = (0 until numRecords) map (i => producer.send(record0))
+ // send message to partition 1
+ responses ++= ((0 until numRecords) map (i => producer.send(record1, new CloseCallback(producer))))
+ assertTrue("No request is complete.", responses.forall(!_.isDone()))
+ // flush the messages.
+ producer.flush()
+ assertTrue("All request are complete.", responses.forall(_.isDone()))
+ // Check the messages received by broker.
+ val fetchResponse0 = if (leader0.get == configs(0).brokerId) {
+ consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ } else {
+ consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
+ }
+ val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
+ consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
+ } else {
+ consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
+ }
+ val expectedNumRecords = (i + 1) * numRecords
+ assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
+ expectedNumRecords, fetchResponse0.messageSet(topic, 0).size)
+ assertEquals("Fetch response to partition 1 should have %d messages.".format(expectedNumRecords),
+ expectedNumRecords, fetchResponse1.messageSet(topic, 1).size)
+ }
+ } finally {
+ if (producer != null)
+ producer.close()
+ }
+ }
}