You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/07 01:26:39 UTC
[08/13] Rename client package from kafka.* to org.apache.kafka.*
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/utils/KafkaThread.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/KafkaThread.java b/clients/src/main/java/kafka/common/utils/KafkaThread.java
deleted file mode 100644
index f830aba..0000000
--- a/clients/src/main/java/kafka/common/utils/KafkaThread.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package kafka.common.utils;
-
-/**
- * A wrapper for Thread that sets things up nicely
- */
-public class KafkaThread extends Thread {
-
- public KafkaThread(String name, Runnable runnable, boolean daemon) {
- super(runnable, name);
- setDaemon(daemon);
- setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- public void uncaughtException(Thread t, Throwable e) {
- e.printStackTrace();
- }
- });
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/utils/SystemTime.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/SystemTime.java b/clients/src/main/java/kafka/common/utils/SystemTime.java
deleted file mode 100644
index c8ca09c..0000000
--- a/clients/src/main/java/kafka/common/utils/SystemTime.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package kafka.common.utils;
-
-/**
- * A time implementation that uses the system clock and sleep call
- */
-public class SystemTime implements Time {
-
- @Override
- public long milliseconds() {
- return System.currentTimeMillis();
- }
-
- public long nanoseconds() {
- return System.nanoTime();
- }
-
- @Override
- public void sleep(long ms) {
- try {
- Thread.sleep(ms);
- } catch (InterruptedException e) {
- // no stress
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/utils/Time.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/Time.java b/clients/src/main/java/kafka/common/utils/Time.java
deleted file mode 100644
index 390d16f..0000000
--- a/clients/src/main/java/kafka/common/utils/Time.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.utils;
-
-/**
- * An interface abstracting the clock to use in unit testing classes that make use of clock time
- */
-public interface Time {
-
- /**
- * The current time in milliseconds
- */
- public long milliseconds();
-
- /**
- * The current time in nanoseconds
- */
- public long nanoseconds();
-
- /**
- * Sleep for the given number of milliseconds
- */
- public void sleep(long ms);
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/utils/Utils.java b/clients/src/main/java/kafka/common/utils/Utils.java
deleted file mode 100644
index f132771..0000000
--- a/clients/src/main/java/kafka/common/utils/Utils.java
+++ /dev/null
@@ -1,230 +0,0 @@
-package kafka.common.utils;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-
-import kafka.common.KafkaException;
-
-public class Utils {
-
- /**
- * Turn the given UTF8 byte array into a string
- *
- * @param bytes The byte array
- * @return The string
- */
- public static String utf8(byte[] bytes) {
- try {
- return new String(bytes, "UTF8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException("This shouldn't happen.", e);
- }
- }
-
- /**
- * Turn a string into a utf8 byte[]
- *
- * @param string The string
- * @return The byte[]
- */
- public static byte[] utf8(String string) {
- try {
- return string.getBytes("UTF8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException("This shouldn't happen.", e);
- }
- }
-
- /**
- * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes
- *
- * @param buffer The buffer to read from
- * @return The integer read, as a long to avoid signedness
- */
- public static long readUnsignedInt(ByteBuffer buffer) {
- return buffer.getInt() & 0xffffffffL;
- }
-
- /**
- * Read an unsigned integer from the given position without modifying the buffers position
- *
- * @param buffer the buffer to read from
- * @param index the index from which to read the integer
- * @return The integer read, as a long to avoid signedness
- */
- public static long readUnsignedInt(ByteBuffer buffer, int index) {
- return buffer.getInt(index) & 0xffffffffL;
- }
-
- /**
- * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
- *
- * @param buffer The buffer to write to
- * @param value The value to write
- */
- public static void writetUnsignedInt(ByteBuffer buffer, long value) {
- buffer.putInt((int) (value & 0xffffffffL));
- }
-
- /**
- * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
- *
- * @param buffer The buffer to write to
- * @param index The position in the buffer at which to begin writing
- * @param value The value to write
- */
- public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) {
- buffer.putInt(index, (int) (value & 0xffffffffL));
- }
-
- /**
- * Compute the CRC32 of the byte array
- *
- * @param bytes The array to compute the checksum for
- * @return The CRC32
- */
- public static long crc32(byte[] bytes) {
- return crc32(bytes, 0, bytes.length);
- }
-
- /**
- * Compute the CRC32 of the segment of the byte array given by the specificed size and offset
- *
- * @param bytes The bytes to checksum
- * @param offset the offset at which to begin checksumming
- * @param size the number of bytes to checksum
- * @return The CRC32
- */
- public static long crc32(byte[] bytes, int offset, int size) {
- Crc32 crc = new Crc32();
- crc.update(bytes, offset, size);
- return crc.getValue();
- }
-
- /**
- * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from
- * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
- */
- public static int abs(int n) {
- return n & 0x7fffffff;
- }
-
- /**
- * Get the length for UTF8-encoding a string without encoding it first
- *
- * @param s The string to calculate the length for
- * @return The length when serialized
- */
- public static int utf8Length(CharSequence s) {
- int count = 0;
- for (int i = 0, len = s.length(); i < len; i++) {
- char ch = s.charAt(i);
- if (ch <= 0x7F) {
- count++;
- } else if (ch <= 0x7FF) {
- count += 2;
- } else if (Character.isHighSurrogate(ch)) {
- count += 4;
- ++i;
- } else {
- count += 3;
- }
- }
- return count;
- }
-
- /**
- * Read the given byte buffer into a byte array
- */
- public static byte[] toArray(ByteBuffer buffer) {
- return toArray(buffer, 0, buffer.limit());
- }
-
- /**
- * Read a byte array from the given offset and size in the buffer
- */
- public static byte[] toArray(ByteBuffer buffer, int offset, int size) {
- byte[] dest = new byte[size];
- if (buffer.hasArray()) {
- System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size);
- } else {
- int pos = buffer.position();
- buffer.get(dest);
- buffer.position(pos);
- }
- return dest;
- }
-
- /**
- * Check that the parameter t is not null
- *
- * @param t The object to check
- * @return t if it isn't null
- * @throws NullPointerException if t is null.
- */
- public static <T> T notNull(T t) {
- if (t == null)
- throw new NullPointerException();
- else
- return t;
- }
-
- /**
- * Instantiate the class
- */
- public static Object newInstance(Class<?> c) {
- try {
- return c.newInstance();
- } catch (IllegalAccessException e) {
- throw new KafkaException("Could not instantiate class " + c.getName(), e);
- } catch (InstantiationException e) {
- throw new KafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e);
- }
- }
-
- /**
- * Generates 32 bit murmur2 hash from byte array
- * @param data byte array to hash
- * @return 32 bit hash of the given array
- */
- public static int murmur2(final byte[] data) {
- int length = data.length;
- int seed = 0x9747b28c;
- // 'm' and 'r' are mixing constants generated offline.
- // They're not really 'magic', they just happen to work well.
- final int m = 0x5bd1e995;
- final int r = 24;
-
- // Initialize the hash to a random value
- int h = seed ^ length;
- int length4 = length / 4;
-
- for (int i = 0; i < length4; i++) {
- final int i4 = i * 4;
- int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
- k *= m;
- k ^= k >>> r;
- k *= m;
- h *= m;
- h ^= k;
- }
-
- // Handle the last few bytes of the input array
- switch (length % 4) {
- case 3:
- h ^= (data[(length & ~3) + 2] & 0xff) << 16;
- case 2:
- h ^= (data[(length & ~3) + 1] & 0xff) << 8;
- case 1:
- h ^= (data[length & ~3] & 0xff);
- h *= m;
- }
-
- h ^= h >>> 13;
- h *= m;
- h ^= h >>> 15;
-
- return h;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
new file mode 100644
index 0000000..f3ed4ea
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
@@ -0,0 +1,17 @@
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at
+ * which data can be sent for long enough for the alloted buffer to be exhausted.
+ */
+public class BufferExhaustedException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public BufferExhaustedException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
new file mode 100644
index 0000000..ad7d740
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
@@ -0,0 +1,18 @@
+package org.apache.kafka.clients.producer;
+
+/**
+ * A callback interface that the user can implement to allow code to execute when the request is complete. This callback
+ * will generally execute in the background I/O thread so it should be fast.
+ */
+public interface Callback {
+
+ /**
+ * A callback method the user can implement to provide asynchronous handling of request completion. This method will
+ * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
+ * non-null.
+ * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
+ * occurred.
+ * @param exception The exception thrown during processing of this record. Null if no error occurred.
+ */
+ public void onCompletion(RecordMetadata metadata, Exception exception);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
new file mode 100644
index 0000000..dcc40d3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -0,0 +1,287 @@
+package org.apache.kafka.clients.producer;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
+import org.apache.kafka.clients.producer.internals.Metadata;
+import org.apache.kafka.clients.producer.internals.Partitioner;
+import org.apache.kafka.clients.producer.internals.RecordAccumulator;
+import org.apache.kafka.clients.producer.internals.Sender;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.SystemTime;
+
+
+/**
+ * A Kafka client that publishes records to the Kafka cluster.
+ * <P>
+ * The producer is <i>thread safe</i> and should generally be shared among all threads for best performance.
+ * <p>
+ * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
+ * needs to communicate with. Failure to close the producer after use will leak these resources.
+ */
+public class KafkaProducer implements Producer {
+
+ private final Partitioner partitioner;
+ private final int maxRequestSize;
+ private final long metadataFetchTimeoutMs;
+ private final long totalMemorySize;
+ private final Metadata metadata;
+ private final RecordAccumulator accumulator;
+ private final Sender sender;
+ private final Metrics metrics;
+ private final Thread ioThread;
+
+ /**
+ * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
+ * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. Values can be
+ * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
+ * string "42" or the integer 42).
+ */
+ public KafkaProducer(Map<String, Object> configs) {
+ this(new ProducerConfig(configs));
+ }
+
+ /**
+ * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
+ * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
+ */
+ public KafkaProducer(Properties properties) {
+ this(new ProducerConfig(properties));
+ }
+
+ private KafkaProducer(ProducerConfig config) {
+ this.metrics = new Metrics(new MetricConfig(),
+ Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
+ new SystemTime());
+ this.partitioner = new Partitioner();
+ this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+ this.metadata = new Metadata();
+ this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
+ this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
+ this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
+ this.totalMemorySize,
+ config.getLong(ProducerConfig.LINGER_MS_CONFIG),
+ config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL),
+ metrics,
+ new SystemTime());
+ List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG));
+ this.metadata.update(Cluster.bootstrap(addresses), System.currentTimeMillis());
+ this.sender = new Sender(new Selector(),
+ this.metadata,
+ this.accumulator,
+ config.getString(ProducerConfig.CLIENT_ID_CONFIG),
+ config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
+ config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+ (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG),
+ config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG),
+ new SystemTime());
+ this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true);
+ this.ioThread.start();
+ }
+
+ private static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+ List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+ for (String url : urls) {
+ if (url != null && url.length() > 0) {
+ String[] pieces = url.split(":");
+ if (pieces.length != 2)
+ throw new ConfigException("Invalid url in metadata.broker.list: " + url);
+ try {
+ InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
+ if (address.isUnresolved())
+ throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url);
+ addresses.add(address);
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Invalid port in metadata.broker.list: " + url);
+ }
+ }
+ }
+ if (addresses.size() < 1)
+ throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
+ return addresses;
+ }
+
+ /**
+ * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
+ */
+ @Override
+ public Future<RecordMetadata> send(ProducerRecord record) {
+ return send(record, null);
+ }
+
+ /**
+ * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
+ * <p>
+ * The send is asynchronous and this method will return immediately once the record has been stored in the buffer of
+ * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the
+ * response after each one.
+ * <p>
+ * The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to and the offset
+ * it was assigned.
+ * <p>
+ * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
+ * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
+ * get()} on this future will result in the metadata for the record or throw any exception that occurred while
+ * sending the record.
+ * <p>
+ * If you want to simulate a simple blocking call you can do the following:
+ *
+ * <pre>
+ * producer.send(new ProducerRecord("the-topic", "key, "value")).get();
+ * </pre>
+ * <p>
+ * Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
+ * will be invoked when the request is complete.
+ *
+ * <pre>
+ * ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
+ * producer.send(myRecord,
+ * new Callback() {
+ * public void onCompletion(RecordMetadata metadata, Exception e) {
+ * if(e != null)
+ * e.printStackTrace();
+ * System.out.println("The offset of the record we just sent is: " + metadata.offset());
+ * }
+ * });
+ * </pre>
+ *
+ * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
+ * following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
+ *
+ * <pre>
+ * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
+ * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
+ * </pre>
+ * <p>
+ * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
+ * they will delay the sending of messages from other threads. If you want to execute blocking or computationally
+ * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
+ * to parallelize processing.
+ * <p>
+ * The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is
+ * controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called faster than the
+ * I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in
+ * this case is to block the send call until the I/O thread catches up and more buffer space is available. However
+ * in cases where non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will cause the
+ * producer to instead throw an exception when buffer memory is exhausted.
+ *
+ * @param record The record to send
+ * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
+ * indicates no callback)
+ */
+ @Override
+ public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
+ try {
+ Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
+ int partition = partitioner.partition(record, cluster);
+ ensureValidSize(record.key(), record.value());
+ TopicPartition tp = new TopicPartition(record.topic(), partition);
+ FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
+ this.sender.wakeup();
+ return future;
+ } catch (Exception e) {
+ if (callback != null)
+ callback.onCompletion(null, e);
+ return new FutureFailure(e);
+ }
+ }
+
+ /**
+ * Check that this key-value pair will have a serialized size small enough
+ */
+ private void ensureValidSize(byte[] key, byte[] value) {
+ int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
+ if (serializedSize > this.maxRequestSize)
+ throw new RecordTooLargeException("The message is " + serializedSize
+ + " bytes when serialized which is larger than the maximum request size you have configured with the "
+ + ProducerConfig.MAX_REQUEST_SIZE_CONFIG
+ + " configuration.");
+ if (serializedSize > this.totalMemorySize)
+ throw new RecordTooLargeException("The message is " + serializedSize
+ + " bytes when serialized which is larger than the total memory buffer you have configured with the "
+ + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
+ + " configuration.");
+ }
+
+ public List<PartitionInfo> partitionsFor(String topic) {
+ return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsFor(topic);
+ }
+
+ @Override
+ public Map<String, ? extends Metric> metrics() {
+ return Collections.unmodifiableMap(this.metrics.metrics());
+ }
+
+ /**
+ * Close this producer. This method blocks until all in-flight requests complete.
+ */
+ @Override
+ public void close() {
+ this.sender.initiateClose();
+ try {
+ this.ioThread.join();
+ } catch (InterruptedException e) {
+ throw new KafkaException(e);
+ }
+ this.metrics.close();
+ }
+
+ private static class FutureFailure implements Future<RecordMetadata> {
+
+ private final ExecutionException exception;
+
+ public FutureFailure(Exception exception) {
+ this.exception = new ExecutionException(exception);
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws ExecutionException {
+ throw this.exception;
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+ throw this.exception;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
new file mode 100644
index 0000000..76dbd53
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -0,0 +1,200 @@
+package org.apache.kafka.clients.producer;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
+import org.apache.kafka.clients.producer.internals.Partitioner;
+import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+
+/**
+ * A mock of the producer interface you can use for testing code that uses Kafka.
+ * <p>
+ * By default this mock will synchronously complete each send call successfully. However it can be configured to allow
+ * the user to control the completion of the call and supply an optional error for the producer to throw.
+ */
+public class MockProducer implements Producer {
+
+ private final Cluster cluster;
+ private final Partitioner partitioner = new Partitioner();
+ private final List<ProducerRecord> sent;
+ private final Deque<Completion> completions;
+ private boolean autoComplete;
+ private Map<TopicPartition, Long> offsets;
+
+ /**
+ * Create a mock producer
+ *
+ * @param cluster The cluster holding metadata for this producer
+ * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
+ * the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
+ * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link
+ * java.util.concurrent.Future Future<RecordMetadata>} that is returned.
+ */
+ public MockProducer(Cluster cluster, boolean autoComplete) {
+ this.cluster = cluster;
+ this.autoComplete = autoComplete;
+ this.offsets = new HashMap<TopicPartition, Long>();
+ this.sent = new ArrayList<ProducerRecord>();
+ this.completions = new ArrayDeque<Completion>();
+ }
+
+ /**
+ * Create a new mock producer with invented metadata the given autoComplete setting.
+ *
+ * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)}
+ */
+ public MockProducer(boolean autoComplete) {
+ this(Cluster.empty(), autoComplete);
+ }
+
+ /**
+ * Create a new auto completing mock producer
+ *
+ * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)}
+ */
+ public MockProducer() {
+ this(true);
+ }
+
+ /**
+ * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied.
+ *
+ * @see #history()
+ */
+ @Override
+ public synchronized Future<RecordMetadata> send(ProducerRecord record) {
+ return send(record, null);
+ }
+
+ /**
+ * Adds the record to the list of sent records.
+ *
+ * @see #history()
+ */
+ @Override
+ public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
+ int partition = 0;
+ if (this.cluster.partitionsFor(record.topic()) != null)
+ partition = partitioner.partition(record, this.cluster);
+ ProduceRequestResult result = new ProduceRequestResult();
+ FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
+ TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
+ long offset = nextOffset(topicPartition);
+ Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, offset), result, callback);
+ this.sent.add(record);
+ if (autoComplete)
+ completion.complete(null);
+ else
+ this.completions.addLast(completion);
+ return future;
+ }
+
+ /**
+ * Get the next offset for this topic/partition
+ */
+ private long nextOffset(TopicPartition tp) {
+ Long offset = this.offsets.get(tp);
+ if (offset == null) {
+ this.offsets.put(tp, 1L);
+ return 0L;
+ } else {
+ Long next = offset + 1;
+ this.offsets.put(tp, next);
+ return offset;
+ }
+ }
+
+ public List<PartitionInfo> partitionsFor(String topic) {
+ return this.cluster.partitionsFor(topic);
+ }
+
+ public Map<String, Metric> metrics() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ /**
+ * Get the list of sent records since the last call to {@link #clear()}
+ */
+ public synchronized List<ProducerRecord> history() {
+ return new ArrayList<ProducerRecord>(this.sent);
+ }
+
+ /**
+ * Clear the stored history of sent records
+ */
+ public synchronized void clear() {
+ this.sent.clear();
+ this.completions.clear();
+ }
+
+ /**
+ * Complete the earliest uncompleted call successfully.
+ *
+ * @return true if there was an uncompleted call to complete
+ */
+ public synchronized boolean completeNext() {
+ return errorNext(null);
+ }
+
+ /**
+ * Complete the earliest uncompleted call with the given error.
+ *
+ * @return true if there was an uncompleted call to complete
+ */
+ public synchronized boolean errorNext(RuntimeException e) {
+ Completion completion = this.completions.pollFirst();
+ if (completion != null) {
+ completion.complete(e);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private static class Completion {
+ private final long offset;
+ private final RecordMetadata metadata;
+ private final ProduceRequestResult result;
+ private final Callback callback;
+ private final TopicPartition topicPartition;
+
+ public Completion(TopicPartition topicPartition,
+ long offset,
+ RecordMetadata metadata,
+ ProduceRequestResult result,
+ Callback callback) {
+ this.metadata = metadata;
+ this.offset = offset;
+ this.result = result;
+ this.callback = callback;
+ this.topicPartition = topicPartition;
+ }
+
+ public void complete(RuntimeException e) {
+ result.done(topicPartition, e == null ? offset : -1L, e);
+ if (callback != null) {
+ if (e == null)
+ callback.onCompletion(metadata, null);
+ else
+ callback.onCompletion(null, e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
new file mode 100644
index 0000000..7daea9f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -0,0 +1,49 @@
+package org.apache.kafka.clients.producer;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.PartitionInfo;
+
+
+/**
+ * The interface for the {@link KafkaProducer}
+ *
+ * @see KafkaProducer
+ * @see MockProducer
+ */
+public interface Producer extends Closeable {
+
+ /**
+ * Send the given record asynchronously and return a future which will eventually contain the response information.
+ *
+ * @param record The record to send
+ * @return A future which will eventually contain the response information
+ */
+ public Future<RecordMetadata> send(ProducerRecord record);
+
+ /**
+ * Send a record and invoke the given callback when the record has been acknowledged by the server
+ */
+ public Future<RecordMetadata> send(ProducerRecord record, Callback callback);
+
+ /**
+ * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
+ * over time so this list should not be cached.
+ */
+ public List<PartitionInfo> partitionsFor(String topic);
+
+ /**
+ * Return a map of metrics maintained by the producer
+ */
+ public Map<String, ? extends Metric> metrics();
+
+ /**
+ * Close this producer
+ */
+ public void close();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
new file mode 100644
index 0000000..77955a9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -0,0 +1,131 @@
+package org.apache.kafka.clients.producer;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+
+/**
+ * The producer configuration keys
+ */
+public class ProducerConfig extends AbstractConfig {
+
+ private static final ConfigDef config;
+
+ /**
+ * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form
+ * <code>host1:port1,host2:port2,...</code>. These urls are just used for the initial connection to discover the
+ * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you
+ * may want more than one, though, in case a server is down).
+ */
+ public static final String BROKER_LIST_CONFIG = "metadata.broker.list";
+
+ /**
+ * The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that
+ * topic.
+ */
+ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
+
+ /**
+ * The buffer size allocated for a partition. When records are received which are smaller than this size the
+ * producer will attempt to optimistically group them together until this size is reached.
+ */
+ public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes";
+
+ /**
+ * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent
+ * faster than they can be delivered to the server the producer will either block or throw an exception based on the
+ * preference specified by {@link #BLOCK_ON_BUFFER_FULL}.
+ */
+ public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
+
+ /**
+ * The number of acknowledgments the producer requires from the server before considering a request complete.
+ */
+ public static final String REQUIRED_ACKS_CONFIG = "request.required.acks";
+
+ /**
+ * The maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment
+ * requirements the producer has specified. If the requested number of acknowledgments are not met an error will be
+ * returned.
+ */
+ public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms";
+
+ /**
+ * The producer groups together any records that arrive in between request sends. Normally this occurs only under
+ * load when records arrive faster than they can be sent out. However the client can reduce the number of requests
+ * and increase throughput by adding a small amount of artificial delay to force more records to batch together.
+ * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of records
+ * for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many
+ * bytes accumulated for this partition we will "linger" for the specified time waiting for more records to show up.
+ * This setting defaults to 0.
+ */
+ public static final String LINGER_MS_CONFIG = "linger.ms";
+
+ /**
+ * Force a refresh of the cluster metadata after this period of time. This ensures that changes to the number of
+ * partitions or other settings will by taken up by producers without restart.
+ */
+ public static final String METADATA_REFRESH_MS_CONFIG = "topic.metadata.refresh.interval.ms";
+
+ /**
+ * The id string to pass to the server when making requests. The purpose of this is to be able to track the source
+ * of requests beyond just ip/port by allowing a logical application name to be included.
+ */
+ public static final String CLIENT_ID_CONFIG = "client.id";
+
+ /**
+ * The size of the TCP send buffer to use when sending data
+ */
+ public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
+
+ /**
+ * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server
+ * has its own cap on record size which may be different from this.
+ */
+ public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
+
+ /**
+ * The amount of time to wait before attempting to reconnect to a given host. This avoids repeated connecting to a
+ * host in a tight loop.
+ */
+ public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
+
+ /**
+ * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default
+ * this setting is true and we block, however users who want to guarantee we never block can turn this into an
+ * error.
+ */
+ public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full";
+
+ public static final String ENABLE_JMX = "enable.jmx";
+
+ static {
+ /* TODO: add docs */
+ config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah")
+ .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), "blah blah")
+ .define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0), "blah blah")
+ .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), "blah blah")
+ /* TODO: should be a string to handle acks=in-sync */
+ .define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah")
+ .define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah")
+ .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah")
+ .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
+ .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
+ .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
+ .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah")
+ .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
+ .define(BLOCK_ON_BUFFER_FULL, Type.BOOLEAN, true, "blah blah")
+ .define(ENABLE_JMX, Type.BOOLEAN, true, "");
+ }
+
+ ProducerConfig(Map<? extends Object, ? extends Object> props) {
+ super(config, props);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
new file mode 100644
index 0000000..10d1b96
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -0,0 +1,84 @@
+package org.apache.kafka.clients.producer;
+
+/**
+ * A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional
+ * partition number, and an optional key and value.
+ * <p>
+ * If a valid partition number is specified that partition will be used when sending the record. If no partition is
+ * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
+ * present a partition will be assigned in a round-robin fashion.
+ */
+public final class ProducerRecord {
+
+ private final String topic;
+ private final Integer partition;
+ private final byte[] key;
+ private final byte[] value;
+
+ /**
+ * Creates a record to be sent to a specified topic and partition
+ *
+ * @param topic The topic the record will be appended to
+ * @param partition The partition to which the record should be sent
+ * @param key The key that will be included in the record
+ * @param value The record contents
+ */
+ public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) {
+ if (topic == null)
+ throw new IllegalArgumentException("Topic cannot be null");
+ this.topic = topic;
+ this.partition = partition;
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * Create a record to be sent to Kafka
+ *
+ * @param topic The topic the record will be appended to
+ * @param key The key that will be included in the record
+ * @param value The record contents
+ */
+ public ProducerRecord(String topic, byte[] key, byte[] value) {
+ this(topic, null, key, value);
+ }
+
+ /**
+ * Create a record with no key
+ *
+ * @param topic The topic this record should be sent to
+ * @param value The record contents
+ */
+ public ProducerRecord(String topic, byte[] value) {
+ this(topic, null, value);
+ }
+
+ /**
+ * The topic this record is being sent to
+ */
+ public String topic() {
+ return topic;
+ }
+
+ /**
+ * The key (or null if no key is specified)
+ */
+ public byte[] key() {
+ return key;
+ }
+
+ /**
+ * @return The value
+ */
+ public byte[] value() {
+ return value;
+ }
+
+ /**
+ * The partition to which the record will be sent (or null if no partition was specified)
+ */
+ public Integer partition() {
+ return partition;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
new file mode 100644
index 0000000..5118b3a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -0,0 +1,39 @@
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * The metadata for a record that has been acknowledged by the server
+ */
+public final class RecordMetadata {
+
+ private final long offset;
+ private final TopicPartition topicPartition;
+
+ public RecordMetadata(TopicPartition topicPartition, long offset) {
+ super();
+ this.offset = offset;
+ this.topicPartition = topicPartition;
+ }
+
+ /**
+ * The offset of the record in the topic/partition.
+ */
+ public long offset() {
+ return this.offset;
+ }
+
+ /**
+ * The topic the record was appended to
+ */
+ public String topic() {
+ return this.topicPartition.topic();
+ }
+
+ /**
+ * The partition the record was sent to
+ */
+ public int partition() {
+ return this.topicPartition.partition();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
new file mode 100644
index 0000000..2548dad
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -0,0 +1,224 @@
+package org.apache.kafka.clients.producer.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.kafka.clients.producer.BufferExhaustedException;
+
+
+/**
+ * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In
+ * particular it has the following properties:
+ * <ol>
+ * <li>There is a special "poolable size" and buffers of this size are kept in a free list and recycled
+ * <li>It is fair. That is all memory is given to the longest waiting thread until it has sufficient memory. This
+ * prevents starvation or deadlock when a thread asks for a large chunk of memory and needs to block until multiple
+ * buffers are deallocated.
+ * </ol>
+ */
+public final class BufferPool {
+
+ private final long totalMemory;
+ private final int poolableSize;
+ private final boolean blockOnExhaustion;
+ private final ReentrantLock lock;
+ private final Deque<ByteBuffer> free;
+ private final Deque<Condition> waiters;
+ private long availableMemory;
+
+ /**
+ * Create a new buffer pool
+ *
+ * @param memory The maximum amount of memory that this buffer pool can allocate
+ * @param poolableSize The buffer size to cache in the free list rather than deallocating
+ * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the
+ * {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false
+ * {@link #allocate(int)} will throw an exception if the buffer is out of memory.
+ */
+ public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion) {
+ this.poolableSize = poolableSize;
+ this.blockOnExhaustion = blockOnExhaustion;
+ this.lock = new ReentrantLock();
+ this.free = new ArrayDeque<ByteBuffer>();
+ this.waiters = new ArrayDeque<Condition>();
+ this.totalMemory = memory;
+ this.availableMemory = memory;
+ }
+
+ /**
+ * Allocate a buffer of the given size
+ *
+ * @param size The buffer size to allocate in bytes
+ * @return The buffer
+ * @throws InterruptedException If the thread is interrupted while blocked
+ * @throws IllegalArgument if size is larger than the total memory controlled by the pool (and hence we would block
+ * forever)
+ * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool
+ */
+ public ByteBuffer allocate(int size) throws InterruptedException {
+ if (size > this.totalMemory)
+ throw new IllegalArgumentException("Attempt to allocate " + size
+ + " bytes, but there is a hard limit of "
+ + this.totalMemory
+ + " on memory allocations.");
+
+ this.lock.lock();
+ try {
+ // check if we have a free buffer of the right size pooled
+ if (size == poolableSize && !this.free.isEmpty())
+ return this.free.pollFirst();
+
+ // now check if the request is immediately satisfiable with the
+ // memory on hand or if we need to block
+ int freeListSize = this.free.size() * this.poolableSize;
+ if (this.availableMemory + freeListSize >= size) {
+ // we have enough unallocated or pooled memory to immediately
+ // satisfy the request
+ freeUp(size);
+ this.availableMemory -= size;
+ lock.unlock();
+ return ByteBuffer.allocate(size);
+ } else if (!blockOnExhaustion) {
+ throw new BufferExhaustedException("You have exhausted the " + this.totalMemory
+ + " bytes of memory you configured for the client and the client is configured to error"
+ + " rather than block when memory is exhausted.");
+ } else {
+ // we are out of memory and will have to block
+ int accumulated = 0;
+ ByteBuffer buffer = null;
+ Condition moreMemory = this.lock.newCondition();
+ this.waiters.addLast(moreMemory);
+ // loop over and over until we have a buffer or have reserved
+ // enough memory to allocate one
+ while (accumulated < size) {
+ moreMemory.await();
+ // check if we can satisfy this request from the free list,
+ // otherwise allocate memory
+ if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
+ // just grab a buffer from the free list
+ buffer = this.free.pollFirst();
+ accumulated = size;
+ } else {
+ // we'll need to allocate memory, but we may only get
+ // part of what we need on this iteration
+ freeUp(size - accumulated);
+ int got = (int) Math.min(size - accumulated, this.availableMemory);
+ this.availableMemory -= got;
+ accumulated += got;
+ }
+ }
+
+ // remove the condition for this thread to let the next thread
+ // in line start getting memory
+ Condition removed = this.waiters.removeFirst();
+ if (removed != moreMemory)
+ throw new IllegalStateException("Wrong condition: this shouldn't happen.");
+
+ // signal any additional waiters if there is more memory left
+ // over for them
+ if (this.availableMemory > 0 || !this.free.isEmpty()) {
+ if (!this.waiters.isEmpty())
+ this.waiters.peekFirst().signal();
+ }
+
+ // unlock and return the buffer
+ lock.unlock();
+ if (buffer == null)
+ return ByteBuffer.allocate(size);
+ else
+ return buffer;
+ }
+ } finally {
+ if (lock.isHeldByCurrentThread())
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled
+ * buffers (if needed)
+ */
+ private void freeUp(int size) {
+ while (!this.free.isEmpty() && this.availableMemory < size)
+ this.availableMemory += this.free.pollLast().capacity();
+ }
+
+ /**
+ * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the
+ * memory as free.
+ *
+ * @param buffers The buffers to return
+ */
+ public void deallocate(ByteBuffer... buffers) {
+ lock.lock();
+ try {
+ for (int i = 0; i < buffers.length; i++) {
+ int size = buffers[i].capacity();
+ if (size == this.poolableSize) {
+ buffers[i].clear();
+ this.free.add(buffers[i]);
+ } else {
+ this.availableMemory += size;
+ }
+ Condition moreMem = this.waiters.peekFirst();
+ if (moreMem != null)
+ moreMem.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * the total free memory both unallocated and in the free list
+ */
+ public long availableMemory() {
+ lock.lock();
+ try {
+ return this.availableMemory + this.free.size() * this.poolableSize;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Get the unallocated memory (not in the free list or in use)
+ */
+ public long unallocatedMemory() {
+ lock.lock();
+ try {
+ return this.availableMemory;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * The number of threads blocked waiting on memory
+ */
+ public int queued() {
+ lock.lock();
+ try {
+ return this.waiters.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * The buffer size that will be retained in the free list after use
+ */
+ public int poolableSize() {
+ return this.poolableSize;
+ }
+
+ /**
+ * The total memory managed by this pool
+ */
+ public long totalMemory() {
+ return this.totalMemory;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
new file mode 100644
index 0000000..f4c7970
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -0,0 +1,64 @@
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+
+/**
+ * The future result of a record send
+ */
+public final class FutureRecordMetadata implements Future<RecordMetadata> {
+
+ private final ProduceRequestResult result;
+ private final long relativeOffset;
+
+ public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) {
+ this.result = result;
+ this.relativeOffset = relativeOffset;
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws InterruptedException, ExecutionException {
+ this.result.await();
+ return valueOrError();
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ boolean occurred = this.result.await(timeout, unit);
+ if (!occurred)
+ throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
+ return valueOrError();
+ }
+
+ private RecordMetadata valueOrError() throws ExecutionException {
+ if (this.result.error() != null)
+ throw new ExecutionException(this.result.error());
+ else
+ return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset);
+ }
+
+ public long relativeOffset() {
+ return this.relativeOffset;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return this.result.completed();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
new file mode 100644
index 0000000..87c3cba
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -0,0 +1,121 @@
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+
+
+/**
+ * A class encapsulating some of the logic around metadata.
+ * <p>
+ * This class is shared by the client thread (for partitioning) and the background sender thread.
+ *
+ * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metdata for a
+ * topic we don't have any metadata for it will trigger a metadata update.
+ */
+public final class Metadata {
+
+ private final long refreshBackoffMs;
+ private final long metadataExpireMs;
+ private long lastRefresh;
+ private Cluster cluster;
+ private boolean forceUpdate;
+ private final Set<String> topics;
+
+ /**
+ * Create a metadata instance with reasonable defaults
+ */
+ public Metadata() {
+ this(100L, 60 * 60 * 1000L);
+ }
+
+ /**
+ * Create a new Metadata instance
+ * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
+ * polling
+ * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+ */
+ public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+ this.refreshBackoffMs = refreshBackoffMs;
+ this.metadataExpireMs = metadataExpireMs;
+ this.lastRefresh = 0L;
+ this.cluster = Cluster.empty();
+ this.forceUpdate = false;
+ this.topics = new HashSet<String>();
+ }
+
+ /**
+ * Get the current cluster info without blocking
+ */
+ public synchronized Cluster fetch() {
+ return this.cluster;
+ }
+
+ /**
+ * Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic,
+ * block waiting for an update.
+ * @param topic The topic we want metadata for
+ * @param maxWaitMs The maximum amount of time to block waiting for metadata
+ */
+ public synchronized Cluster fetch(String topic, long maxWaitMs) {
+ List<PartitionInfo> partitions = null;
+ do {
+ partitions = cluster.partitionsFor(topic);
+ if (partitions == null) {
+ long begin = System.currentTimeMillis();
+ topics.add(topic);
+ forceUpdate = true;
+ try {
+ wait(maxWaitMs);
+ } catch (InterruptedException e) { /* this is fine, just try again */
+ }
+ long ellapsed = System.currentTimeMillis() - begin;
+ if (ellapsed > maxWaitMs)
+ throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+ } else {
+ return cluster;
+ }
+ } while (true);
+ }
+
+ /**
+ * Does the current cluster info need to be updated? An update is needed if it has been at least refreshBackoffMs
+ * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more
+ * than metadataExpireMs has passed since the last refresh)
+ */
+ public synchronized boolean needsUpdate(long now) {
+ long msSinceLastUpdate = now - this.lastRefresh;
+ boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs;
+ boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs;
+ return updateAllowed && updateNeeded;
+ }
+
+ /**
+ * Force an update of the current cluster info
+ */
+ public synchronized void forceUpdate() {
+ this.forceUpdate = true;
+ }
+
+ /**
+ * Get the list of topics we are currently maintaining metadata for
+ */
+ public synchronized Set<String> topics() {
+ return new HashSet<String>(this.topics);
+ }
+
+ /**
+ * Update the cluster metadata
+ */
+ public synchronized void update(Cluster cluster, long now) {
+ this.forceUpdate = false;
+ this.lastRefresh = now;
+ this.cluster = cluster;
+ notifyAll();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
new file mode 100644
index 0000000..04fcae9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
@@ -0,0 +1,56 @@
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
+
+
+/**
+ * The default partitioning strategy:
+ * <ul>
+ * <li>If a partition is specified in the record, use it
+ * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
+ * <li>If no partition or key is present choose a partition in a round-robin fashion
+ */
+public class Partitioner {
+
+ private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
+
+ /**
+ * Compute the partition for the given record.
+ *
+ * @param record The record being sent
+ * @param numPartitions The total number of partitions for the given topic
+ */
+ public int partition(ProducerRecord record, Cluster cluster) {
+ List<PartitionInfo> partitions = cluster.partitionsFor(record.topic());
+ int numPartitions = partitions.size();
+ if (record.partition() != null) {
+ // they have given us a partition, use it
+ if (record.partition() < 0 || record.partition() >= numPartitions)
+ throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
+ + " is not in the range [0..."
+ + numPartitions
+ + "].");
+ return record.partition();
+ } else if (record.key() == null) {
+ // choose the next available node in a round-robin fashion
+ for (int i = 0; i < numPartitions; i++) {
+ int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
+ if (partitions.get(partition).leader() != null)
+ return partition;
+ }
+ // no partitions are available, give a non-available partition
+ return Utils.abs(counter.getAndIncrement()) % numPartitions;
+ } else {
+ // hash the key to choose a partition
+ return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
new file mode 100644
index 0000000..ffeea1a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -0,0 +1,82 @@
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+
+/**
+ * A class that models the future completion of a produce request for a single partition. There is one of these per
+ * partition in a produce request and it is shared by all the {@link RecordMetadata} instances that are batched together
+ * for the same partition in the request.
+ */
+public final class ProduceRequestResult {
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private volatile TopicPartition topicPartition;
+ private volatile long baseOffset = -1L;
+ private volatile RuntimeException error;
+
+ public ProduceRequestResult() {
+ }
+
+ /**
+ * Mark this request as complete and unblock any threads waiting on its completion.
+ * @param topicPartition The topic and partition to which this record set was sent was sent
+ * @param baseOffset The base offset assigned to the record
+ * @param error The error that occurred if there was one, or null.
+ */
+ public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
+ this.topicPartition = topicPartition;
+ this.baseOffset = baseOffset;
+ this.error = error;
+ this.latch.countDown();
+ }
+
+ /**
+ * Await the completion of this request
+ */
+ public void await() throws InterruptedException {
+ latch.await();
+ }
+
+ /**
+ * Await the completion of this request (up to the given time interval)
+ * @param timeout The maximum time to wait
+ * @param unit The unit for the max time
+ * @return true if the request completed, false if we timed out
+ */
+ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+ return latch.await(timeout, unit);
+ }
+
+ /**
+ * The base offset for the request (the first offset in the record set)
+ */
+ public long baseOffset() {
+ return baseOffset;
+ }
+
+ /**
+ * The error thrown (generally on the server) while processing this request
+ */
+ public RuntimeException error() {
+ return error;
+ }
+
+ /**
+ * The topic and partition to which the record was appended
+ */
+ public TopicPartition topicPartition() {
+ return topicPartition;
+ }
+
+ /**
+ * Has the request completed?
+ */
+ public boolean completed() {
+ return this.latch.getCount() == 0L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
new file mode 100644
index 0000000..18eff5d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -0,0 +1,235 @@
+package org.apache.kafka.clients.producer.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.CopyOnWriteMap;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+
+
+/**
+ * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} instances to be
+ * sent to the server.
+ * <p>
+ * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
+ * this behavior is explicitly disabled.
+ */
+public final class RecordAccumulator {
+
+ private volatile boolean closed;
+ private int drainIndex;
+ private final int batchSize;
+ private final long lingerMs;
+ private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
+ private final BufferPool free;
+ private final Time time;
+
+ /**
+ * Create a new record accumulator
+ *
+ * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances
+ * @param totalSize The maximum memory the record accumulator can use.
+ * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
+ * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
+ * latency for potentially better throughput due to more batching (and hence fewer, larger requests).
+ * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of
+ * memory
+ * @param metrics The metrics
+ * @param time The time instance to use
+ */
+ public RecordAccumulator(int batchSize, long totalSize, long lingerMs, boolean blockOnBufferFull, Metrics metrics, Time time) {
+ this.drainIndex = 0;
+ this.closed = false;
+ this.batchSize = batchSize;
+ this.lingerMs = lingerMs;
+ this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
+ this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull);
+ this.time = time;
+ registerMetrics(metrics);
+ }
+
+ private void registerMetrics(Metrics metrics) {
+ metrics.addMetric("blocked_threads",
+ "The number of user threads blocked waiting for buffer memory to enqueue their records",
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.queued();
+ }
+ });
+ metrics.addMetric("buffer_total_bytes",
+ "The total amount of buffer memory that is available (not currently used for buffering records).",
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.totalMemory();
+ }
+ });
+ metrics.addMetric("buffer_available_bytes",
+ "The total amount of buffer memory that is available (not currently used for buffering records).",
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return free.availableMemory();
+ }
+ });
+ }
+
+ /**
+ * Add a record to the accumulator.
+ * <p>
+ * This method will block if sufficient memory isn't available for the record unless blocking has been disabled.
+ *
+ * @param tp The topic/partition to which this record is being sent
+ * @param key The key for the record
+ * @param value The value for the record
+ * @param compression The compression codec for the record
+ * @param callback The user-supplied callback to execute when the request is complete
+ */
+ public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
+ if (closed)
+ throw new IllegalStateException("Cannot send after the producer is closed.");
+ // check if we have an in-progress batch
+ Deque<RecordBatch> dq = dequeFor(tp);
+ synchronized (dq) {
+ RecordBatch batch = dq.peekLast();
+ if (batch != null) {
+ FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback);
+ if (future != null)
+ return future;
+ }
+ }
+
+ // we don't have an in-progress record batch try to allocate a new batch
+ int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
+ ByteBuffer buffer = free.allocate(size);
+ synchronized (dq) {
+ RecordBatch first = dq.peekLast();
+ if (first != null) {
+ FutureRecordMetadata future = first.tryAppend(key, value, compression, callback);
+ if (future != null) {
+ // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
+ // often...
+ free.deallocate(buffer);
+ return future;
+ }
+ }
+ RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds());
+ FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback));
+ dq.addLast(batch);
+ return future;
+ }
+ }
+
+ /**
+ * Get a list of topic-partitions which are ready to be sent.
+ * <p>
+ * A partition is ready if ANY of the following are true:
+ * <ol>
+ * <li>The record set is full
+ * <li>The record set has sat in the accumulator for at least lingerMs milliseconds
+ * <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are
+ * immediately considered ready).
+ * <li>The accumulator has been closed
+ * </ol>
+ */
+ public List<TopicPartition> ready(long now) {
+ List<TopicPartition> ready = new ArrayList<TopicPartition>();
+ boolean exhausted = this.free.queued() > 0;
+ for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
+ Deque<RecordBatch> deque = entry.getValue();
+ synchronized (deque) {
+ RecordBatch batch = deque.peekFirst();
+ if (batch != null) {
+ boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining();
+ boolean expired = now - batch.created >= lingerMs;
+ if (full | expired | exhausted | closed)
+ ready.add(batch.topicPartition);
+ }
+ }
+ }
+ return ready;
+ }
+
+ /**
+ * Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts
+ * to avoid choosing the same topic-partitions over and over.
+ *
+ * @param partitions The list of partitions to drain
+ * @param maxSize The maximum number of bytes to drain
+ * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize.
+ * TODO: There may be a starvation issue due to iteration order
+ */
+ public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize) {
+ if (partitions.isEmpty())
+ return Collections.emptyList();
+ int size = 0;
+ List<RecordBatch> ready = new ArrayList<RecordBatch>();
+ /* to make starvation less likely this loop doesn't start at 0 */
+ int start = drainIndex = drainIndex % partitions.size();
+ do {
+ TopicPartition tp = partitions.get(drainIndex);
+ Deque<RecordBatch> deque = dequeFor(tp);
+ if (deque != null) {
+ synchronized (deque) {
+ if (size + deque.peekFirst().records.sizeInBytes() > maxSize) {
+ return ready;
+ } else {
+ RecordBatch batch = deque.pollFirst();
+ size += batch.records.sizeInBytes();
+ ready.add(batch);
+ }
+ }
+ }
+ this.drainIndex = (this.drainIndex + 1) % partitions.size();
+ } while (start != drainIndex);
+ return ready;
+ }
+
+ /**
+ * Get the deque for the given topic-partition, creating it if necessary. Since new topics will only be added rarely
+ * we copy-on-write the hashmap
+ */
+ private Deque<RecordBatch> dequeFor(TopicPartition tp) {
+ Deque<RecordBatch> d = this.batches.get(tp);
+ if (d != null)
+ return d;
+ this.batches.putIfAbsent(tp, new ArrayDeque<RecordBatch>());
+ return this.batches.get(tp);
+ }
+
+ /**
+ * Deallocate the list of record batches
+ */
+ public void deallocate(Collection<RecordBatch> batches) {
+ ByteBuffer[] buffers = new ByteBuffer[batches.size()];
+ int i = 0;
+ for (RecordBatch batch : batches) {
+ buffers[i] = batch.records.buffer();
+ i++;
+ }
+ free.deallocate(buffers);
+ }
+
+ /**
+ * Close this accumulator and force all the record buffers to be drained
+ */
+ public void close() {
+ this.closed = true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
new file mode 100644
index 0000000..6ba392e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -0,0 +1,87 @@
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+
+
+/**
+ * A batch of records that is or will be sent.
+ *
+ * This class is not thread safe and external synchronization must be used when modifying it
+ */
+public final class RecordBatch {
+ public int recordCount = 0;
+ public final long created;
+ public final MemoryRecords records;
+ public final TopicPartition topicPartition;
+ private final ProduceRequestResult produceFuture;
+ private final List<Thunk> thunks;
+
+ public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
+ this.created = now;
+ this.records = records;
+ this.topicPartition = tp;
+ this.produceFuture = new ProduceRequestResult();
+ this.thunks = new ArrayList<Thunk>();
+ }
+
+ /**
+ * Append the record to the current record set and return the relative offset within that record set
+ *
+ * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
+ */
+ public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
+ if (!this.records.hasRoomFor(key, value)) {
+ return null;
+ } else {
+ this.records.append(0L, key, value, compression);
+ FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
+ if (callback != null)
+ thunks.add(new Thunk(callback, this.recordCount));
+ this.recordCount++;
+ return future;
+ }
+ }
+
+ /**
+ * Complete the request
+ *
+ * @param offset The offset
+ * @param errorCode The error code or 0 if no error
+ */
+ public void done(long offset, RuntimeException exception) {
+ this.produceFuture.done(topicPartition, offset, exception);
+ // execute callbacks
+ for (int i = 0; i < this.thunks.size(); i++) {
+ try {
+ Thunk thunk = this.thunks.get(i);
+ if (exception == null)
+ thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset),
+ null);
+ else
+ thunk.callback.onCompletion(null, exception);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * A callback and the associated RecordSend argument to pass to it.
+ */
+ final private static class Thunk {
+ final Callback callback;
+ final long relativeOffset;
+
+ public Thunk(Callback callback, long relativeOffset) {
+ this.callback = callback;
+ this.relativeOffset = relativeOffset;
+ }
+ }
+}
\ No newline at end of file