You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/03/01 00:51:58 UTC
[2/2] kafka git commit: KAFKA-1865 Add a flush() method to the
producer.
KAFKA-1865 Add a flush() method to the producer.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0636928d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0636928d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0636928d
Branch: refs/heads/trunk
Commit: 0636928d961a6ceaab46d908f9372d913c3e5faf
Parents: 22ff9e9
Author: Jay Kreps <ja...@gmail.com>
Authored: Sat Feb 7 12:01:51 2015 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Sat Feb 28 14:11:59 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/Metadata.java | 10 +-
.../kafka/clients/producer/KafkaProducer.java | 187 ++++++++++++---
.../kafka/clients/producer/MockProducer.java | 5 +
.../apache/kafka/clients/producer/Producer.java | 5 +
.../kafka/clients/producer/ProducerRecord.java | 20 +-
.../internals/FutureRecordMetadata.java | 10 +-
.../producer/internals/RecordAccumulator.java | 77 ++++++-
.../clients/producer/internals/RecordBatch.java | 13 +-
.../kafka/common/errors/InterruptException.java | 34 +++
.../apache/kafka/common/utils/SystemTime.java | 2 +-
.../org/apache/kafka/clients/MetadataTest.java | 103 +++++++++
.../kafka/clients/producer/BufferPoolTest.java | 193 ----------------
.../kafka/clients/producer/MetadataTest.java | 95 --------
.../clients/producer/MockProducerTest.java | 6 +
.../kafka/clients/producer/PartitionerTest.java | 69 ------
.../clients/producer/RecordAccumulatorTest.java | 207 -----------------
.../kafka/clients/producer/SenderTest.java | 155 -------------
.../producer/internals/BufferPoolTest.java | 193 ++++++++++++++++
.../producer/internals/PartitionerTest.java | 68 ++++++
.../internals/RecordAccumulatorTest.java | 228 +++++++++++++++++++
.../clients/producer/internals/SenderTest.java | 154 +++++++++++++
.../kafka/api/ProducerSendTest.scala | 62 +++--
.../test/scala/unit/kafka/utils/TestUtils.scala | 4 +-
23 files changed, 1097 insertions(+), 803 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index e8afecd..c8bde8b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -99,19 +99,15 @@ public final class Metadata {
/**
* Wait for metadata update until the current version is larger than the last version we know of
*/
- public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
+ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {
- try {
- if (remainingWaitMs != 0) {
- wait(remainingWaitMs);
- }
- } catch (InterruptedException e) { /* this is fine */
- }
+ if (remainingWaitMs != 0)
+ wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1fd6917..7397e56 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
@@ -55,10 +56,66 @@ import org.slf4j.LoggerFactory;
/**
* A Kafka client that publishes records to the Kafka cluster.
* <P>
- * The producer is <i>thread safe</i> and should generally be shared among all threads for best performance.
+ * The producer is <i>thread safe</i> and sharing a single producer instance across threads will generally be faster than
+ * having multiple instances.
* <p>
- * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
- * needs to communicate with. Failure to close the producer after use will leak these resources.
+ * Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value
+ * pairs.
+ * <pre>
+ * {@code
+ * Properties props = new Properties();
+ * props.put("bootstrap.servers", "localhost:4242");
+ * props.put("acks", "all");
+ * props.put("retries", 0);
+ * props.put("batch.size", 16384);
+ * props.put("linger.ms", 1);
+ * props.put("buffer.memory", 33554432);
+ * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ *
+ * Producer<String, String> producer = new KafkaProducer(props);
+ * for(int i = 0; i < 100; i++)
+ * producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
+ *
+ * producer.close();
+ * }</pre>
+ * <p>
+ * The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server
+ * as well as a background I/O thread that is responsible for turning these records into requests and transmitting them
+ * to the cluster. Failure to close the producer after use will leak these resources.
+ * <p>
+ * The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends
+ * and immediately returns. This allows the producer to batch together individual records for efficiency.
+ * <p>
+ * The <code>acks</code> config controls the criteria under which requests are considered complete. The "all" setting
+ * we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
+ * <p>
+ * If the request fails, the producer can automatically retry, though since we have specified <code>retries</code>
+ * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on
+ * <a href="http://kafka.apache.org/documentation.html#semantics">message delivery semantics</a> for details).
+ * <p>
+ * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by
+ * the <code>batch.size</code> config. Making this larger can result in more batching, but requires more memory (since we will
+ * generally have one of these buffers for each active partition).
+ * <p>
+ * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you
+ * want to reduce the number of requests you can set <code>linger.ms</code> to something greater than 0. This will
+ * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will
+ * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above,
+ * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting
+ * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that
+ * records that arrive close together in time will generally batch together even with <code>linger.ms=0</code> so under heavy load
+ * batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more
+ * efficient requests when not under maximal load at the cost of a small amount of latency.
+ * <p>
+ * The <code>buffer.memory</code> controls the total amount of memory available to the producer for buffering. If records
+ * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is
+ * exhausted additional send calls will block. For uses where you want to avoid any blocking you can set <code>block.on.buffer.full=false</code> which
+ * will cause the send call to result in an exception.
+ * <p>
+ * The <code>key.serializer</code> and <code>value.serializer</code> instruct how to turn the key and value objects the user provides with
+ * their <code>ProducerRecord</code> into bytes. You can use the included {@link org.apache.kafka.common.serialization.ByteArraySerializer} or
+ * {@link org.apache.kafka.common.serialization.StringSerializer} for simple string or byte types.
*/
public class KafkaProducer<K, V> implements Producer<K, V> {
@@ -241,8 +298,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
/**
- * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
- * @param record The record to be sent
+ * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
+ * See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
@@ -261,53 +318,59 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* <p>
* Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
* {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
- * get()} on this future will result in the metadata for the record or throw any exception that occurred while
- * sending the record.
+ * get()} on this future will block until the associated request completes and then return the metadata for the record
+ * or throw any exception that occurred while sending the record.
* <p>
- * If you want to simulate a simple blocking call you can do the following:
+ * If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately:
*
- * <pre>{@code
- * producer.send(new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes())).get();
+ * <pre>
+ * {@code
+ * byte[] key = "key".getBytes();
+ * byte[] value = "value".getBytes();
+ * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
+ * producer.send(record).get();
* }</pre>
* <p>
- * Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
+ * Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
* will be invoked when the request is complete.
*
- * <pre>{@code
- * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
- * producer.send(myRecord,
- * new Callback() {
- * public void onCompletion(RecordMetadata metadata, Exception e) {
- * if(e != null)
- * e.printStackTrace();
- * System.out.println("The offset of the record we just sent is: " + metadata.offset());
- * }
- * });
- * }</pre>
+ * <pre>
+ * {@code
+ * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
+ * producer.send(myRecord,
+ * new Callback() {
+ * public void onCompletion(RecordMetadata metadata, Exception e) {
+ * if(e != null)
+ * e.printStackTrace();
+ * System.out.println("The offset of the record we just sent is: " + metadata.offset());
+ * }
+ * });
+ * }
+ * </pre>
*
* Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
* following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
*
- * <pre>{@code
+ * <pre>
+ * {@code
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
- * }</pre>
+ * }
+ * </pre>
* <p>
* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
* they will delay the sending of messages from other threads. If you want to execute blocking or computationally
* expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
* to parallelize processing.
- * <p>
- * The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is
- * controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called faster than the
- * I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in
- * this case is to block the send call until the I/O thread catches up and more buffer space is available. However
- * in cases where non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will cause the
- * producer to instead throw an exception when buffer memory is exhausted.
*
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
+ *
+ * @throws InterruptException If the thread is interrupted while blocked
+ * @throws SerializationException If the key or value are not valid objects given the configured serializers
+ * @throws BufferExhaustedException If <code>block.on.buffer.full=false</code> and the buffer is full.
+ *
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
@@ -352,7 +415,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
- throw new KafkaException(e);
+ throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
throw e;
@@ -364,7 +427,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @param topic The topic we want metadata for
* @param maxWaitMs The maximum time in ms for waiting on the metadata
*/
- private void waitOnMetadata(String topic, long maxWaitMs) {
+ private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
if (metadata.fetch().partitionsForTopic(topic) != null) {
return;
} else {
@@ -399,20 +462,72 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}
+
+ /**
+ * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
+ * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
+ * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
+ * A request is considered completed when it is successfully acknowledged
+ * according to the <code>acks</code> configuration you have specified or else it results in an error.
+ * <p>
+ * Other threads can continue sending records while one thread is blocked waiting for a flush call to complete,
+ * however no guarantee is made about the completion of records sent after the flush call begins.
+ * <p>
+ * This method can be useful when consuming from some input system and producing into Kafka. The <code>flush()</code> call
+ * gives a convenient way to ensure all previously sent messages have actually completed.
+ * <p>
+ * This example shows how to consume from one Kafka topic and produce to another Kafka topic:
+ * <pre>
+ * {@code
+ * for(ConsumerRecord<String, String> record: consumer.poll(100))
+ * producer.send(new ProducerRecord("my-topic", record.key(), record.value());
+ * producer.flush();
+ * consumer.commit();
+ * }
+ * </pre>
+ *
+ * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur
+ * we need to set <code>retries=<large_number></code> in our config.
+ *
+ * @throws InterruptException If the thread is interrupted while blocked
+ */
+ @Override
+ public void flush() {
+ log.trace("Flushing accumulated records in producer.");
+ this.accumulator.beginFlush();
+ this.sender.wakeup();
+ try {
+ this.accumulator.awaitFlushCompletion();
+ } catch (InterruptedException e) {
+ throw new InterruptException("Flush interrupted.", e);
+ }
+ }
+ /**
+ * Get the partition metadata for the give topic. This can be used for custom partitioning.
+ * @throws InterruptException If the thread is interrupted while blocked
+ */
@Override
public List<PartitionInfo> partitionsFor(String topic) {
- waitOnMetadata(topic, this.metadataFetchTimeoutMs);
+ try {
+ waitOnMetadata(topic, this.metadataFetchTimeoutMs);
+ } catch (InterruptedException e) {
+ throw new InterruptException(e);
+ }
return this.metadata.fetch().partitionsForTopic(topic);
}
+ /**
+ * Get the full set of internal metrics maintained by the producer.
+ */
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
}
/**
- * Close this producer. This method blocks until all in-flight requests complete.
+ * Close this producer. This method blocks until all previously sent requests complete.
+ * @throws InterruptException If the thread is interrupted while blocked
*/
@Override
public void close() {
@@ -421,7 +536,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
try {
this.ioThread.join();
} catch (InterruptedException e) {
- throw new KafkaException(e);
+ throw new InterruptException(e);
}
this.metrics.close();
this.keySerializer.close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 84530f2..6913090 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -128,6 +128,11 @@ public class MockProducer implements Producer<byte[], byte[]> {
return offset;
}
}
+
+ public synchronized void flush() {
+ while (!this.completions.isEmpty())
+ completeNext();
+ }
public List<PartitionInfo> partitionsFor(String topic) {
return this.cluster.partitionsForTopic(topic);
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 17fe541..5b3e75e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -45,6 +45,11 @@ public interface Producer<K, V> extends Closeable {
* Send a record and invoke the given callback when the record has been acknowledged by the server
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
+
+ /**
+ * Flush any accumulated records from the producer. Blocks until all sends are complete.
+ */
+ public void flush();
/**
* Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index 4990692..75cd51e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -102,15 +102,21 @@ public final class ProducerRecord<K, V> {
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof ProducerRecord)) return false;
+ if (this == o)
+ return true;
+ else if (!(o instanceof ProducerRecord))
+ return false;
- ProducerRecord that = (ProducerRecord) o;
+ ProducerRecord<?, ?> that = (ProducerRecord<?, ?>) o;
- if (key != null ? !key.equals(that.key) : that.key != null) return false;
- if (partition != null ? !partition.equals(that.partition) : that.partition != null) return false;
- if (topic != null ? !topic.equals(that.topic) : that.topic != null) return false;
- if (value != null ? !value.equals(that.value) : that.value != null) return false;
+ if (key != null ? !key.equals(that.key) : that.key != null)
+ return false;
+ else if (partition != null ? !partition.equals(that.partition) : that.partition != null)
+ return false;
+ else if (topic != null ? !topic.equals(that.topic) : that.topic != null)
+ return false;
+ else if (value != null ? !value.equals(that.value) : that.value != null)
+ return false;
return true;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index 4a2da41..e2d9ca8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -51,13 +51,17 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> {
return valueOrError();
}
- private RecordMetadata valueOrError() throws ExecutionException {
+ RecordMetadata valueOrError() throws ExecutionException {
if (this.result.error() != null)
throw new ExecutionException(this.result.error());
else
- return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
+ return value();
}
-
+
+ RecordMetadata value() {
+ return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
+ }
+
public long relativeOffset() {
return this.relativeOffset;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index ecfe214..d5c79e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.Cluster;
@@ -55,6 +56,7 @@ public final class RecordAccumulator {
private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
private volatile boolean closed;
+ private volatile AtomicInteger flushesInProgress;
private int drainIndex;
private final int batchSize;
private final long lingerMs;
@@ -62,6 +64,7 @@ public final class RecordAccumulator {
private final BufferPool free;
private final Time time;
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
+ private final IncompleteRecordBatches incomplete;
/**
* Create a new record accumulator
@@ -89,12 +92,14 @@ public final class RecordAccumulator {
Map<String, String> metricTags) {
this.drainIndex = 0;
this.closed = false;
+ this.flushesInProgress = new AtomicInteger(0);
this.batchSize = batchSize;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
String metricGrpName = "producer-metrics";
this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags);
+ this.incomplete = new IncompleteRecordBatches();
this.time = time;
registerMetrics(metrics, metricGrpName, metricTags);
}
@@ -146,9 +151,8 @@ public final class RecordAccumulator {
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, callback);
- if (future != null) {
+ if (future != null)
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
- }
}
}
@@ -161,8 +165,7 @@ public final class RecordAccumulator {
if (last != null) {
FutureRecordMetadata future = last.tryAppend(key, value, callback);
if (future != null) {
- // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
- // often...
+ // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
@@ -172,6 +175,7 @@ public final class RecordAccumulator {
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
dq.addLast(batch);
+ incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
}
@@ -226,7 +230,7 @@ public final class RecordAccumulator {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.records.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
- boolean sendable = full || expired || exhausted || closed;
+ boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
@@ -266,7 +270,6 @@ public final class RecordAccumulator {
* @param maxSize The maximum number of bytes to drain
* @param now The current unix time in milliseconds
* @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize.
- * TODO: There may be a starvation issue due to iteration order
*/
public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
@@ -324,8 +327,32 @@ public final class RecordAccumulator {
* Deallocate the record batch
*/
public void deallocate(RecordBatch batch) {
+ incomplete.remove(batch);
free.deallocate(batch.records.buffer(), batch.records.capacity());
}
+
+ /**
+ * Are there any threads currently waiting on a flush?
+ */
+ private boolean flushInProgress() {
+ return flushesInProgress.get() > 0;
+ }
+
+ /**
+ * Initiate the flushing of data from the accumulator...this makes all requests immediately ready
+ */
+ public void beginFlush() {
+ this.flushesInProgress.getAndIncrement();
+ }
+
+ /**
+ * Mark all partitions as ready to send and block until the send is complete
+ */
+ public void awaitFlushCompletion() throws InterruptedException {
+ for (RecordBatch batch: this.incomplete.all())
+ batch.produceFuture.await();
+ this.flushesInProgress.decrementAndGet();
+ }
/**
* Close this accumulator and force all the record buffers to be drained
@@ -334,7 +361,9 @@ public final class RecordAccumulator {
this.closed = true;
}
-
+ /*
+ * Metadata about a record just appended to the record accumulator
+ */
public final static class RecordAppendResult {
public final FutureRecordMetadata future;
public final boolean batchIsFull;
@@ -347,6 +376,9 @@ public final class RecordAccumulator {
}
}
+ /*
+ * The set of nodes that have at least one complete record batch in the accumulator
+ */
public final static class ReadyCheckResult {
public final Set<Node> readyNodes;
public final long nextReadyCheckDelayMs;
@@ -358,4 +390,35 @@ public final class RecordAccumulator {
this.unknownLeadersExist = unknownLeadersExist;
}
}
+
+ /*
+ * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet
+ */
+ private final static class IncompleteRecordBatches {
+ private final Set<RecordBatch> incomplete;
+
+ public IncompleteRecordBatches() {
+ this.incomplete = new HashSet<RecordBatch>();
+ }
+
+ public void add(RecordBatch batch) {
+ synchronized (incomplete) {
+ this.incomplete.add(batch);
+ }
+ }
+
+ public void remove(RecordBatch batch) {
+ synchronized (incomplete) {
+ boolean removed = this.incomplete.remove(batch);
+ if (!removed)
+ throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
+ }
+ }
+
+ public Iterable<RecordBatch> all() {
+ synchronized (incomplete) {
+ return new ArrayList<RecordBatch>(this.incomplete);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index dd0af8a..06182db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -16,6 +16,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
@@ -39,7 +40,7 @@ public final class RecordBatch {
public long lastAttemptMs;
public final MemoryRecords records;
public final TopicPartition topicPartition;
- private final ProduceRequestResult produceFuture;
+ public final ProduceRequestResult produceFuture;
private final List<Thunk> thunks;
public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
@@ -77,7 +78,6 @@ public final class RecordBatch {
* @param exception The exception that occurred (or null if the request was successful)
*/
public void done(long baseOffset, RuntimeException exception) {
- this.produceFuture.done(topicPartition, baseOffset, exception);
log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
topicPartition,
baseOffset,
@@ -86,14 +86,17 @@ public final class RecordBatch {
for (int i = 0; i < this.thunks.size(); i++) {
try {
Thunk thunk = this.thunks.get(i);
- if (exception == null)
- thunk.callback.onCompletion(thunk.future.get(), null);
- else
+ if (exception == null) {
+ RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset());
+ thunk.callback.onCompletion(metadata, null);
+ } else {
thunk.callback.onCompletion(null, exception);
+ }
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);
}
}
+ this.produceFuture.done(topicPartition, baseOffset, exception);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
new file mode 100644
index 0000000..fee322f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * An unchecked wrapper for InterruptedException
+ */
+public class InterruptException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InterruptException(InterruptedException cause) {
+ super(cause);
+ Thread.currentThread().interrupt();
+ }
+
+ public InterruptException(String message, InterruptedException cause) {
+ super(message, cause);
+ Thread.currentThread().interrupt();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
index d682bd4..18725de 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
@@ -36,7 +36,7 @@ public class SystemTime implements Time {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
- // no stress
+ // just wake up early
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
new file mode 100644
index 0000000..928087d
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MetadataTest {
+
+ private long refreshBackoffMs = 100;
+ private long metadataExpireMs = 1000;
+ private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
+ private AtomicBoolean backgroundError = new AtomicBoolean(false);
+
+ @After
+ public void tearDown() {
+ assertFalse(backgroundError.get());
+ }
+
+ @Test
+ public void testMetadata() throws Exception {
+ long time = 0;
+ metadata.update(Cluster.empty(), time);
+ assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+ metadata.requestUpdate();
+ assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
+ time += refreshBackoffMs;
+ assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0);
+ String topic = "my-topic";
+ Thread t1 = asyncFetch(topic);
+ Thread t2 = asyncFetch(topic);
+ assertTrue("Awaiting update", t1.isAlive());
+ assertTrue("Awaiting update", t2.isAlive());
+ metadata.update(TestUtils.singletonCluster(topic, 1), time);
+ t1.join();
+ t2.join();
+ assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+ time += metadataExpireMs;
+ assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
+ }
+
+ /**
+ * Tests that {@link org.apache.kafka.clients.Metadata#awaitUpdate(int, long)} doesn't
+ * wait forever with a max timeout value of 0
+ *
+ * @throws Exception
+ * @see https://issues.apache.org/jira/browse/KAFKA-1836
+ */
+ @Test
+ public void testMetadataUpdateWaitTime() throws Exception {
+ long time = 0;
+ metadata.update(Cluster.empty(), time);
+ assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+ // first try with a max wait time of 0 and ensure that this returns back without waiting forever
+ try {
+ metadata.awaitUpdate(metadata.requestUpdate(), 0);
+ fail("Wait on metadata update was expected to timeout, but it didn't");
+ } catch (TimeoutException te) {
+ // expected
+ }
+ // now try with a higher timeout value once
+ final long twoSecondWait = 2000;
+ try {
+ metadata.awaitUpdate(metadata.requestUpdate(), twoSecondWait);
+ fail("Wait on metadata update was expected to timeout, but it didn't");
+ } catch (TimeoutException te) {
+ // expected
+ }
+ }
+
+ private Thread asyncFetch(final String topic) {
+ Thread thread = new Thread() {
+ public void run() {
+ while (metadata.fetch().partitionsForTopic(topic) == null) {
+ try {
+ metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs);
+ } catch (Exception e) {
+ backgroundError.set(true);
+ }
+ }
+ }
+ };
+ thread.start();
+ return thread;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
deleted file mode 100644
index 4ae43ed..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import org.apache.kafka.clients.producer.internals.BufferPool;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.*;
-
-public class BufferPoolTest {
- private MockTime time = new MockTime();
- private Metrics metrics = new Metrics(time);
- String metricGroup = "TestMetrics";
- Map<String, String> metricTags = new LinkedHashMap<String, String>();
-
- /**
- * Test the simple non-blocking allocation paths
- */
- @Test
- public void testSimple() throws Exception {
- long totalMemory = 64 * 1024;
- int size = 1024;
- BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags);
- ByteBuffer buffer = pool.allocate(size);
- assertEquals("Buffer size should equal requested size.", size, buffer.limit());
- assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
- assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory());
- buffer.putInt(1);
- buffer.flip();
- pool.deallocate(buffer);
- assertEquals("All memory should be available", totalMemory, pool.availableMemory());
- assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory());
- buffer = pool.allocate(size);
- assertEquals("Recycled buffer should be cleared.", 0, buffer.position());
- assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit());
- pool.deallocate(buffer);
- assertEquals("All memory should be available", totalMemory, pool.availableMemory());
- assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory());
- buffer = pool.allocate(2 * size);
- pool.deallocate(buffer);
- assertEquals("All memory should be available", totalMemory, pool.availableMemory());
- assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory());
- }
-
- /**
- * Test that we cannot try to allocate more memory then we have in the whole pool
- */
- @Test(expected = IllegalArgumentException.class)
- public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
- BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags);
- ByteBuffer buffer = pool.allocate(1024);
- assertEquals(1024, buffer.limit());
- pool.deallocate(buffer);
- buffer = pool.allocate(1025);
- }
-
- @Test
- public void testNonblockingMode() throws Exception {
- BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags);
- pool.allocate(1);
- try {
- pool.allocate(2);
- fail("The buffer allocated more than it has!");
- } catch (BufferExhaustedException e) {
- // this is good
- }
- }
-
- /**
- * Test that delayed allocation blocks
- */
- @Test
- public void testDelayedAllocation() throws Exception {
- BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags);
- ByteBuffer buffer = pool.allocate(1024);
- CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
- CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
- assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount());
- doDealloc.countDown(); // return the memory
- allocation.await();
- }
-
- private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
- final CountDownLatch latch = new CountDownLatch(1);
- Thread thread = new Thread() {
- public void run() {
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- pool.deallocate(buffer);
- }
- };
- thread.start();
- return latch;
- }
-
- private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
- final CountDownLatch completed = new CountDownLatch(1);
- Thread thread = new Thread() {
- public void run() {
- try {
- pool.allocate(size);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- completed.countDown();
- }
- }
- };
- thread.start();
- return completed;
- }
-
- /**
- * This test creates lots of threads that hammer on the pool
- */
- @Test
- public void testStressfulSituation() throws Exception {
- int numThreads = 10;
- final int iterations = 50000;
- final int poolableSize = 1024;
- final int totalMemory = numThreads / 2 * poolableSize;
- final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags);
- List<StressTestThread> threads = new ArrayList<StressTestThread>();
- for (int i = 0; i < numThreads; i++)
- threads.add(new StressTestThread(pool, iterations));
- for (StressTestThread thread : threads)
- thread.start();
- for (StressTestThread thread : threads)
- thread.join();
- for (StressTestThread thread : threads)
- assertTrue("Thread should have completed all iterations successfully.", thread.success.get());
- assertEquals(totalMemory, pool.availableMemory());
- }
-
- public static class StressTestThread extends Thread {
- private final int iterations;
- private final BufferPool pool;
- public final AtomicBoolean success = new AtomicBoolean(false);
-
- public StressTestThread(BufferPool pool, int iterations) {
- this.iterations = iterations;
- this.pool = pool;
- }
-
- public void run() {
- try {
- for (int i = 0; i < iterations; i++) {
- int size;
- if (TestUtils.RANDOM.nextBoolean())
- // allocate poolable size
- size = pool.poolableSize();
- else
- // allocate a random size
- size = TestUtils.RANDOM.nextInt((int) pool.totalMemory());
- ByteBuffer buffer = pool.allocate(size);
- pool.deallocate(buffer);
- }
- success.set(true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
deleted file mode 100644
index 743aa7e..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class MetadataTest {
-
- private long refreshBackoffMs = 100;
- private long metadataExpireMs = 1000;
- private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
-
- @Test
- public void testMetadata() throws Exception {
- long time = 0;
- metadata.update(Cluster.empty(), time);
- assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
- metadata.requestUpdate();
- assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
- time += refreshBackoffMs;
- assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0);
- String topic = "my-topic";
- Thread t1 = asyncFetch(topic);
- Thread t2 = asyncFetch(topic);
- assertTrue("Awaiting update", t1.isAlive());
- assertTrue("Awaiting update", t2.isAlive());
- metadata.update(TestUtils.singletonCluster(topic, 1), time);
- t1.join();
- t2.join();
- assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
- time += metadataExpireMs;
- assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
- }
-
- /**
- * Tests that {@link org.apache.kafka.clients.Metadata#awaitUpdate(int, long)} doesn't
- * wait forever with a max timeout value of 0
- *
- * @throws Exception
- * @see https://issues.apache.org/jira/browse/KAFKA-1836
- */
- @Test
- public void testMetadataUpdateWaitTime() throws Exception {
- long time = 0;
- metadata.update(Cluster.empty(), time);
- assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
- // first try with a max wait time of 0 and ensure that this returns back without waiting forever
- try {
- metadata.awaitUpdate(metadata.requestUpdate(), 0);
- fail("Wait on metadata update was expected to timeout, but it didn't");
- } catch (TimeoutException te) {
- // expected
- }
- // now try with a higher timeout value once
- final long twoSecondWait = 2000;
- try {
- metadata.awaitUpdate(metadata.requestUpdate(), twoSecondWait);
- fail("Wait on metadata update was expected to timeout, but it didn't");
- } catch (TimeoutException te) {
- // expected
- }
- }
-
- private Thread asyncFetch(final String topic) {
- Thread thread = new Thread() {
- public void run() {
- while (metadata.fetch().partitionsForTopic(topic) == null) {
- try {
- metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs);
- } catch (TimeoutException e) {
- // let it go
- }
- }
- }
- };
- thread.start();
- return thread;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 75513b0..6372f1a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -67,6 +67,12 @@ public class MockProducerTest {
assertEquals(e, err.getCause());
}
assertFalse("No more requests to complete", producer.completeNext());
+
+ Future<RecordMetadata> md3 = producer.send(record1);
+ Future<RecordMetadata> md4 = producer.send(record2);
+ assertTrue("Requests should not be completed.", !md3.isDone() && !md4.isDone());
+ producer.flush();
+ assertTrue("Requests should be completed.", md3.isDone() && md4.isDone());
}
private boolean isError(Future<?> future) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
deleted file mode 100644
index 404bedb..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-import org.apache.kafka.clients.producer.internals.Partitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Test;
-
-public class PartitionerTest {
-
- private byte[] key = "key".getBytes();
- private Partitioner partitioner = new Partitioner();
- private Node node0 = new Node(0, "localhost", 99);
- private Node node1 = new Node(1, "localhost", 100);
- private Node node2 = new Node(2, "localhost", 101);
- private Node[] nodes = new Node[] {node0, node1, node2};
- private String topic = "test";
- // Intentionally make the partition list not in partition order to test the edge cases.
- private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
- new PartitionInfo(topic, 2, node1, nodes, nodes),
- new PartitionInfo(topic, 0, node0, nodes, nodes));
- private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
-
- @Test
- public void testUserSuppliedPartitioning() {
- assertEquals("If the user supplies a partition we should use it.", 0, partitioner.partition("test", key, 0, cluster));
- }
-
- @Test
- public void testKeyPartitionIsStable() {
- int partition = partitioner.partition("test", key, null, cluster);
- assertEquals("Same key should yield same partition", partition, partitioner.partition("test", key, null, cluster));
- }
-
- @Test
- public void testRoundRobinWithUnavailablePartitions() {
- // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition,
- // and (2) the available partitions are selected in a round robin way.
- int countForPart0 = 0;
- int countForPart2 = 0;
- for (int i = 1; i <= 100; i++) {
- int part = partitioner.partition("test", null, null, cluster);
- assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2);
- if (part == 0)
- countForPart0++;
- else
- countForPart2++;
- }
- assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
deleted file mode 100644
index 8333863..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kafka.clients.producer.internals.RecordAccumulator;
-import org.apache.kafka.clients.producer.internals.RecordBatch;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.LogEntry;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.Records;
-import org.apache.kafka.common.utils.MockTime;
-import org.junit.Test;
-
-public class RecordAccumulatorTest {
-
- private String topic = "test";
- private int partition1 = 0;
- private int partition2 = 1;
- private int partition3 = 2;
- private Node node1 = new Node(0, "localhost", 1111);
- private Node node2 = new Node(1, "localhost", 1112);
- private TopicPartition tp1 = new TopicPartition(topic, partition1);
- private TopicPartition tp2 = new TopicPartition(topic, partition2);
- private TopicPartition tp3 = new TopicPartition(topic, partition3);
- private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, null, null);
- private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null);
- private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null);
- private MockTime time = new MockTime();
- private byte[] key = "key".getBytes();
- private byte[] value = "value".getBytes();
- private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
- private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3));
- private Metrics metrics = new Metrics(time);
- String metricGroup = "TestMetrics";
- Map<String, String> metricTags = new LinkedHashMap<String, String>();
-
- @Test
- public void testFull() throws Exception {
- long now = time.milliseconds();
- RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags);
- int appends = 1024 / msgSize;
- for (int i = 0; i < appends; i++) {
- accum.append(tp1, key, value, CompressionType.NONE, null);
- assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
- }
- accum.append(tp1, key, value, CompressionType.NONE, null);
- assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
- List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
- assertEquals(1, batches.size());
- RecordBatch batch = batches.get(0);
- Iterator<LogEntry> iter = batch.records.iterator();
- for (int i = 0; i < appends; i++) {
- LogEntry entry = iter.next();
- assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
- assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
- }
- assertFalse("No more records", iter.hasNext());
- }
-
- @Test
- public void testAppendLarge() throws Exception {
- int batchSize = 512;
- RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags);
- accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
- assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
- }
-
- @Test
- public void testLinger() throws Exception {
- long lingerMs = 10L;
- RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags);
- accum.append(tp1, key, value, CompressionType.NONE, null);
- assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
- time.sleep(10);
- assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
- List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
- assertEquals(1, batches.size());
- RecordBatch batch = batches.get(0);
- Iterator<LogEntry> iter = batch.records.iterator();
- LogEntry entry = iter.next();
- assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
- assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
- assertFalse("No more records", iter.hasNext());
- }
-
- @Test
- public void testPartialDrain() throws Exception {
- RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags);
- int appends = 1024 / msgSize + 1;
- List<TopicPartition> partitions = asList(tp1, tp2);
- for (TopicPartition tp : partitions) {
- for (int i = 0; i < appends; i++)
- accum.append(tp, key, value, CompressionType.NONE, null);
- }
- assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
-
- List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id());
- assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
- }
-
- @SuppressWarnings("unused")
- @Test
- public void testStressfulSituation() throws Exception {
- final int numThreads = 5;
- final int msgs = 10000;
- final int numParts = 2;
- final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags);
- List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < numThreads; i++) {
- threads.add(new Thread() {
- public void run() {
- for (int i = 0; i < msgs; i++) {
- try {
- accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- });
- }
- for (Thread t : threads)
- t.start();
- int read = 0;
- long now = time.milliseconds();
- while (read < numThreads * msgs) {
- Set<Node> nodes = accum.ready(cluster, now).readyNodes;
- List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
- if (batches != null) {
- for (RecordBatch batch : batches) {
- for (LogEntry entry : batch.records)
- read++;
- accum.deallocate(batch);
- }
- }
- }
-
- for (Thread t : threads)
- t.join();
- }
-
-
- @Test
- public void testNextReadyCheckDelay() throws Exception {
- // Next check time will use lingerMs since this test won't trigger any retries/backoff
- long lingerMs = 10L;
- RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags);
- // Just short of going over the limit so we trigger linger time
- int appends = 1024 / msgSize;
-
- // Partition on node1 only
- for (int i = 0; i < appends; i++)
- accum.append(tp1, key, value, CompressionType.NONE, null);
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
- assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
- assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
-
- time.sleep(lingerMs / 2);
-
- // Add partition on node2 only
- for (int i = 0; i < appends; i++)
- accum.append(tp3, key, value, CompressionType.NONE, null);
- result = accum.ready(cluster, time.milliseconds());
- assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
- assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
-
- // Add data for another partition on node1, enough to make data sendable immediately
- for (int i = 0; i < appends + 1; i++)
- accum.append(tp2, key, value, CompressionType.NONE, null);
- result = accum.ready(cluster, time.milliseconds());
- assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
- // Note this can actually be < linger time because it may use delays from partitions that aren't sendable
- // but have leaders with other sendable data.
- assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
deleted file mode 100644
index 558942a..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.producer.internals.RecordAccumulator;
-import org.apache.kafka.clients.producer.internals.Sender;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-public class SenderTest {
-
- private static final int MAX_REQUEST_SIZE = 1024 * 1024;
- private static final short ACKS_ALL = -1;
- private static final int MAX_RETRIES = 0;
- private static final int REQUEST_TIMEOUT_MS = 10000;
-
- private TopicPartition tp = new TopicPartition("test", 0);
- private MockTime time = new MockTime();
- private MockClient client = new MockClient(time);
- private int batchSize = 16 * 1024;
- private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
- private Cluster cluster = TestUtils.singletonCluster("test", 1);
- private Metrics metrics = new Metrics(time);
- Map<String, String> metricTags = new LinkedHashMap<String, String>();
- private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time, metricTags);
- private Sender sender = new Sender(client,
- metadata,
- this.accumulator,
- MAX_REQUEST_SIZE,
- ACKS_ALL,
- MAX_RETRIES,
- REQUEST_TIMEOUT_MS,
- metrics,
- time,
- "clientId");
-
- @Before
- public void setup() {
- metadata.update(cluster, time.milliseconds());
- }
-
- @Test
- public void testSimple() throws Exception {
- long offset = 0;
- Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
- sender.run(time.milliseconds()); // connect
- sender.run(time.milliseconds()); // send produce request
- assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
- client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
- sender.run(time.milliseconds());
- assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount());
- sender.run(time.milliseconds());
- assertTrue("Request should be completed", future.isDone());
- assertEquals(offset, future.get().offset());
- }
-
- @Test
- public void testRetries() throws Exception {
- // create a sender with retries = 1
- int maxRetries = 1;
- Sender sender = new Sender(client,
- metadata,
- this.accumulator,
- MAX_REQUEST_SIZE,
- ACKS_ALL,
- maxRetries,
- REQUEST_TIMEOUT_MS,
- new Metrics(),
- time,
- "clientId");
- // do a successful retry
- Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
- sender.run(time.milliseconds()); // connect
- sender.run(time.milliseconds()); // send produce request
- assertEquals(1, client.inFlightRequestCount());
- client.disconnect(client.requests().peek().request().destination());
- assertEquals(0, client.inFlightRequestCount());
- sender.run(time.milliseconds()); // receive error
- sender.run(time.milliseconds()); // reconnect
- sender.run(time.milliseconds()); // resend
- assertEquals(1, client.inFlightRequestCount());
- long offset = 0;
- client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
- sender.run(time.milliseconds());
- assertTrue("Request should have retried and completed", future.isDone());
- assertEquals(offset, future.get().offset());
-
- // do an unsuccessful retry
- future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
- sender.run(time.milliseconds()); // send produce request
- for (int i = 0; i < maxRetries + 1; i++) {
- client.disconnect(client.requests().peek().request().destination());
- sender.run(time.milliseconds()); // receive error
- sender.run(time.milliseconds()); // reconnect
- sender.run(time.milliseconds()); // resend
- }
- sender.run(time.milliseconds());
- completedWithError(future, Errors.NETWORK_EXCEPTION);
- }
-
- private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
- assertTrue("Request should be completed", future.isDone());
- try {
- future.get();
- fail("Should have thrown an exception.");
- } catch (ExecutionException e) {
- assertEquals(error.exception().getClass(), e.getCause().getClass());
- }
- }
-
- private Struct produceResponse(String topic, int part, long offset, int error) {
- Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
- Struct response = struct.instance("responses");
- response.set("topic", topic);
- Struct partResp = response.instance("partition_responses");
- partResp.set("partition", part);
- partResp.set("error_code", (short) error);
- partResp.set("base_offset", offset);
- response.set("partition_responses", new Object[] {partResp});
- struct.set("responses", new Object[] {response});
- return struct;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
new file mode 100644
index 0000000..2c69382
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.*;
+
+public class BufferPoolTest {
+ private MockTime time = new MockTime();
+ private Metrics metrics = new Metrics(time);
+ String metricGroup = "TestMetrics";
+ Map<String, String> metricTags = new LinkedHashMap<String, String>();
+
+ /**
+ * Test the simple non-blocking allocation paths
+ */
+ @Test
+ public void testSimple() throws Exception {
+ long totalMemory = 64 * 1024;
+ int size = 1024;
+ BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags);
+ ByteBuffer buffer = pool.allocate(size);
+ assertEquals("Buffer size should equal requested size.", size, buffer.limit());
+ assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
+ assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory());
+ buffer.putInt(1);
+ buffer.flip();
+ pool.deallocate(buffer);
+ assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+ assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory());
+ buffer = pool.allocate(size);
+ assertEquals("Recycled buffer should be cleared.", 0, buffer.position());
+ assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit());
+ pool.deallocate(buffer);
+ assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+ assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory());
+ buffer = pool.allocate(2 * size);
+ pool.deallocate(buffer);
+ assertEquals("All memory should be available", totalMemory, pool.availableMemory());
+ assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory());
+ }
+
+ /**
+ * Test that we cannot try to allocate more memory then we have in the whole pool
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
+ BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags);
+ ByteBuffer buffer = pool.allocate(1024);
+ assertEquals(1024, buffer.limit());
+ pool.deallocate(buffer);
+ buffer = pool.allocate(1025);
+ }
+
+ @Test
+ public void testNonblockingMode() throws Exception {
+ BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags);
+ pool.allocate(1);
+ try {
+ pool.allocate(2);
+ fail("The buffer allocated more than it has!");
+ } catch (BufferExhaustedException e) {
+ // this is good
+ }
+ }
+
+ /**
+ * Test that delayed allocation blocks
+ */
+ @Test
+ public void testDelayedAllocation() throws Exception {
+ BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags);
+ ByteBuffer buffer = pool.allocate(1024);
+ CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
+ CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
+ assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount());
+ doDealloc.countDown(); // return the memory
+ allocation.await();
+ }
+
+ private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread thread = new Thread() {
+ public void run() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ pool.deallocate(buffer);
+ }
+ };
+ thread.start();
+ return latch;
+ }
+
+ private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
+ final CountDownLatch completed = new CountDownLatch(1);
+ Thread thread = new Thread() {
+ public void run() {
+ try {
+ pool.allocate(size);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ completed.countDown();
+ }
+ }
+ };
+ thread.start();
+ return completed;
+ }
+
+ /**
+ * This test creates lots of threads that hammer on the pool
+ */
+ @Test
+ public void testStressfulSituation() throws Exception {
+ int numThreads = 10;
+ final int iterations = 50000;
+ final int poolableSize = 1024;
+ final long totalMemory = numThreads / 2 * poolableSize;
+ final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags);
+ List<StressTestThread> threads = new ArrayList<StressTestThread>();
+ for (int i = 0; i < numThreads; i++)
+ threads.add(new StressTestThread(pool, iterations));
+ for (StressTestThread thread : threads)
+ thread.start();
+ for (StressTestThread thread : threads)
+ thread.join();
+ for (StressTestThread thread : threads)
+ assertTrue("Thread should have completed all iterations successfully.", thread.success.get());
+ assertEquals(totalMemory, pool.availableMemory());
+ }
+
+ public static class StressTestThread extends Thread {
+ private final int iterations;
+ private final BufferPool pool;
+ public final AtomicBoolean success = new AtomicBoolean(false);
+
+ public StressTestThread(BufferPool pool, int iterations) {
+ this.iterations = iterations;
+ this.pool = pool;
+ }
+
+ public void run() {
+ try {
+ for (int i = 0; i < iterations; i++) {
+ int size;
+ if (TestUtils.RANDOM.nextBoolean())
+ // allocate poolable size
+ size = pool.poolableSize();
+ else
+ // allocate a random size
+ size = TestUtils.RANDOM.nextInt((int) pool.totalMemory());
+ ByteBuffer buffer = pool.allocate(size);
+ pool.deallocate(buffer);
+ }
+ success.set(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}