You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/04 17:40:20 UTC
[2/2] git commit: Implement a few of the API suggestions from the
mailing list.
Implement a few of the API suggestions from the mailing list.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/253f86e3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/253f86e3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/253f86e3
Branch: refs/heads/trunk
Commit: 253f86e31062fb86401abdc13835c251eef47417
Parents: c9052c5
Author: Jay Kreps <ja...@gmail.com>
Authored: Sun Feb 2 21:52:25 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 4 08:39:56 2014 -0800
----------------------------------------------------------------------
.../java/kafka/clients/producer/Callback.java | 13 +-
.../clients/producer/DefaultPartitioner.java | 35 ----
.../kafka/clients/producer/KafkaProducer.java | 164 ++++++++++++-------
.../kafka/clients/producer/MockProducer.java | 82 +++++-----
.../kafka/clients/producer/Partitioner.java | 30 ----
.../java/kafka/clients/producer/Producer.java | 18 +-
.../kafka/clients/producer/ProducerConfig.java | 46 ++----
.../kafka/clients/producer/ProducerRecord.java | 46 +++---
.../kafka/clients/producer/RecordMetadata.java | 39 +++++
.../java/kafka/clients/producer/RecordSend.java | 88 ----------
.../internals/FutureRecordMetadata.java | 63 +++++++
.../clients/producer/internals/Partitioner.java | 55 +++++++
.../internals/ProduceRequestResult.java | 34 ++--
.../producer/internals/RecordAccumulator.java | 28 ++--
.../clients/producer/internals/RecordBatch.java | 32 ++--
.../clients/producer/internals/Sender.java | 5 +-
.../clients/tools/ProducerPerformance.java | 31 ++--
.../java/kafka/common/ByteSerialization.java | 18 --
clients/src/main/java/kafka/common/Cluster.java | 47 ++++--
.../main/java/kafka/common/Deserializer.java | 18 --
.../main/java/kafka/common/PartitionInfo.java | 14 +-
.../src/main/java/kafka/common/Serializer.java | 21 ---
.../java/kafka/common/StringSerialization.java | 58 -------
.../common/errors/CorruptMessageException.java | 23 ---
.../common/errors/CorruptRecordException.java | 23 +++
.../common/errors/MessageTooLargeException.java | 23 ---
.../common/errors/RecordTooLargeException.java | 23 +++
.../kafka/common/network/ByteBufferReceive.java | 4 +-
.../main/java/kafka/common/protocol/Errors.java | 8 +-
.../java/kafka/common/protocol/ProtoUtils.java | 28 ++--
.../java/kafka/common/protocol/Protocol.java | 2 +-
.../java/kafka/common/record/MemoryRecords.java | 4 +-
.../main/java/kafka/common/record/Record.java | 4 +-
.../kafka/clients/producer/MetadataTest.java | 10 +-
.../clients/producer/MockProducerTest.java | 67 ++++----
.../kafka/clients/producer/PartitionerTest.java | 54 ++++++
.../kafka/clients/producer/RecordSendTest.java | 32 ++--
.../java/kafka/clients/producer/SenderTest.java | 17 +-
clients/src/test/java/kafka/test/TestUtils.java | 22 +++
39 files changed, 670 insertions(+), 659 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/Callback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Callback.java b/clients/src/main/java/kafka/clients/producer/Callback.java
index 47e5af3..d287d78 100644
--- a/clients/src/main/java/kafka/clients/producer/Callback.java
+++ b/clients/src/main/java/kafka/clients/producer/Callback.java
@@ -2,14 +2,17 @@ package kafka.clients.producer;
/**
* A callback interface that the user can implement to allow code to execute when the request is complete. This callback
- * will execute in the background I/O thread so it should be fast.
+ * will generally execute in the background I/O thread so it should be fast.
*/
public interface Callback {
/**
- * A callback method the user should implement. This method will be called when the send to the server has
- * completed.
- * @param send The results of the call. This send is guaranteed to be completed so none of its methods will block.
+ * A callback method the user can implement to provide asynchronous handling of request completion. This method will
+ * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
+ * non-null.
+ * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
+ * occurred.
+ * @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
- public void onCompletion(RecordSend send);
+ public void onCompletion(RecordMetadata metadata, Exception exception);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java b/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
deleted file mode 100644
index b82fcfb..0000000
--- a/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package kafka.clients.producer;
-
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import kafka.common.Cluster;
-import kafka.common.utils.Utils;
-
-/**
- * A simple partitioning strategy that will work for messages with or without keys.
- * <p>
- * If there is a partition key specified in the record the partitioner will use that for partitioning. Otherwise, if
- * there there is no partitionKey but there is a normal key that will be used. If neither key is specified the
- * partitioner will round-robin over partitions in the topic.
- * <p>
- * For the cases where there is some key present the partition is computed based on the murmur2 hash of the serialized
- * key.
- */
-public class DefaultPartitioner implements Partitioner {
-
- private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
-
- /**
- * Compute the partition
- */
- @Override
- public int partition(ProducerRecord record, byte[] key, byte[] partitionKey, byte[] value, Cluster cluster, int numPartitions) {
- byte[] keyToUse = partitionKey != null ? partitionKey : key;
- if (keyToUse == null)
- return Utils.abs(counter.getAndIncrement()) % numPartitions;
- else
- return Utils.abs(Utils.murmur2(keyToUse)) % numPartitions;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
index 58eee0c..1dd63fc 100644
--- a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
@@ -6,17 +6,22 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import kafka.clients.producer.internals.FutureRecordMetadata;
import kafka.clients.producer.internals.Metadata;
+import kafka.clients.producer.internals.Partitioner;
import kafka.clients.producer.internals.RecordAccumulator;
import kafka.clients.producer.internals.Sender;
import kafka.common.Cluster;
import kafka.common.KafkaException;
import kafka.common.Metric;
-import kafka.common.Serializer;
+import kafka.common.PartitionInfo;
import kafka.common.TopicPartition;
import kafka.common.config.ConfigException;
-import kafka.common.errors.MessageTooLargeException;
+import kafka.common.errors.RecordTooLargeException;
import kafka.common.metrics.JmxReporter;
import kafka.common.metrics.MetricConfig;
import kafka.common.metrics.Metrics;
@@ -29,24 +34,22 @@ import kafka.common.utils.KafkaThread;
import kafka.common.utils.SystemTime;
/**
- * A Kafka producer that can be used to send data to the Kafka cluster.
+ * A Kafka client that publishes records to the Kafka cluster.
* <P>
* The producer is <i>thread safe</i> and should generally be shared among all threads for best performance.
* <p>
* The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
- * needs to communicate with. Failure to close the producer after use will leak these.
+ * needs to communicate with. Failure to close the producer after use will leak these resources.
*/
public class KafkaProducer implements Producer {
+ private final Partitioner partitioner;
private final int maxRequestSize;
private final long metadataFetchTimeoutMs;
private final long totalMemorySize;
- private final Partitioner partitioner;
private final Metadata metadata;
private final RecordAccumulator accumulator;
private final Sender sender;
- private final Serializer keySerializer;
- private final Serializer valueSerializer;
private final Metrics metrics;
private final Thread ioThread;
@@ -72,9 +75,7 @@ public class KafkaProducer implements Producer {
this.metrics = new Metrics(new MetricConfig(),
Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
new SystemTime());
- this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
- this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
- this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
+ this.partitioner = new Partitioner();
this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
this.metadata = new Metadata();
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
@@ -126,78 +127,84 @@ public class KafkaProducer implements Producer {
* Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
*/
@Override
- public RecordSend send(ProducerRecord record) {
+ public Future<RecordMetadata> send(ProducerRecord record) {
return send(record, null);
}
/**
* Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
* <p>
- * The send is asynchronous and this method will return immediately once the record has been serialized and stored
- * in the buffer of messages waiting to be sent. This allows sending many records in parallel without necessitating
- * blocking to wait for the response after each one.
+ * The send is asynchronous and this method will return immediately once the record has been stored in the buffer of
+ * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the
+ * response after each one.
+ * <p>
+ * The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to and the offset
+ * it was assigned.
+ * <p>
+ * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
+ * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
+ * get()} on this future will result in the metadata for the record or throw any exception that occurred while
+ * sending the record.
* <p>
- * The {@link RecordSend} returned by this call will hold the future response data including the offset assigned to
- * the message and the error (if any) when the request has completed (or returned an error), and this object can be
- * used to block awaiting the response. If you want the equivalent of a simple blocking send you can easily achieve
- * that using the {@link kafka.clients.producer.RecordSend#await() await()} method on the {@link RecordSend} this
- * call returns:
+ * If you want to simulate a simple blocking call you can do the following:
*
* <pre>
- * ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
- * producer.send(myRecord, null).await();
+ * producer.send(new ProducerRecord("the-topic", "key, "value")).get();
* </pre>
- *
- * Note that the send method will not throw an exception if the request fails while communicating with the cluster,
- * rather that exception will be thrown when accessing the {@link RecordSend} that is returned.
* <p>
* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
- * will be invoked when the request is complete. Note that the callback will execute in the I/O thread of the
- * producer and so should be reasonably fast. An example usage of an inline callback would be the following:
+ * will be invoked when the request is complete.
*
* <pre>
* ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
* producer.send(myRecord,
* new Callback() {
- * public void onCompletion(RecordSend send) {
- * try {
- * System.out.println("The offset of the message we just sent is: " + send.offset());
- * } catch(KafkaException e) {
+ * public void onCompletion(RecordMetadata metadata, Exception e) {
+ * if(e != null)
* e.printStackTrace();
- * }
+ * System.out.println("The offset of the record we just sent is: " + metadata.offset());
* }
* });
* </pre>
+ *
+ * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
+ * following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
+ *
+ * <pre>
+ * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
+ * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
+ * </pre>
* <p>
- * This call enqueues the message in the buffer of outgoing messages to be sent. This buffer has a hard limit on
- * it's size controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called
- * faster than the I/O thread can send data to the brokers we will eventually run out of buffer space. The default
- * behavior in this case is to block the send call until the I/O thread catches up and more buffer space is
- * available. However if non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will
- * cause the producer to instead throw an exception when this occurs.
+ * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
+ * they will delay the sending of messages from other threads. If you want to execute blocking or computationally
+ * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
+ * to parallelize processing.
+ * <p>
+ * The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is
+ * controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called faster than the
+ * I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in
+ * this case is to block the send call until the I/O thread catches up and more buffer space is available. However
+ * in cases where non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will cause the
+ * producer to instead throw an exception when buffer memory is exhausted.
*
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
- * @throws BufferExhausedException This exception is thrown if the buffer is full and blocking has been disabled.
- * @throws MessageTooLargeException This exception is thrown if the serialized size of the message is larger than
- * the maximum buffer memory or maximum request size that has been configured (whichever is smaller).
*/
@Override
- public RecordSend send(ProducerRecord record, Callback callback) {
- Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
- byte[] key = keySerializer.toBytes(record.key());
- byte[] value = valueSerializer.toBytes(record.value());
- byte[] partitionKey = keySerializer.toBytes(record.partitionKey());
- int partition = partitioner.partition(record, key, partitionKey, value, cluster, cluster.partitionsFor(record.topic()).size());
- ensureValidSize(key, value);
+ public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
try {
+ Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
+ int partition = partitioner.partition(record, cluster);
+ ensureValidSize(record.key(), record.value());
TopicPartition tp = new TopicPartition(record.topic(), partition);
- RecordSend send = accumulator.append(tp, key, value, CompressionType.NONE, callback);
+ FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
this.sender.wakeup();
- return send;
- } catch (InterruptedException e) {
- throw new KafkaException(e);
+ return future;
+ } catch (Exception e) {
+ if (callback != null)
+ callback.onCompletion(null, e);
+ return new FutureFailure(e);
}
}
@@ -207,15 +214,19 @@ public class KafkaProducer implements Producer {
private void ensureValidSize(byte[] key, byte[] value) {
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
if (serializedSize > this.maxRequestSize)
- throw new MessageTooLargeException("The message is " + serializedSize
- + " bytes when serialized which is larger than the maximum request size you have configured with the "
- + ProducerConfig.MAX_REQUEST_SIZE_CONFIG
- + " configuration.");
+ throw new RecordTooLargeException("The message is " + serializedSize
+ + " bytes when serialized which is larger than the maximum request size you have configured with the "
+ + ProducerConfig.MAX_REQUEST_SIZE_CONFIG
+ + " configuration.");
if (serializedSize > this.totalMemorySize)
- throw new MessageTooLargeException("The message is " + serializedSize
- + " bytes when serialized which is larger than the total memory buffer you have configured with the "
- + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
- + " configuration.");
+ throw new RecordTooLargeException("The message is " + serializedSize
+ + " bytes when serialized which is larger than the total memory buffer you have configured with the "
+ + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
+ + " configuration.");
+ }
+
+ public List<PartitionInfo> partitionsFor(String topic) {
+ return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsFor(topic);
}
@Override
@@ -237,4 +248,39 @@ public class KafkaProducer implements Producer {
this.metrics.close();
}
+ private static class FutureFailure implements Future<RecordMetadata> {
+
+ private final ExecutionException exception;
+
+ public FutureFailure(Exception exception) {
+ this.exception = new ExecutionException(exception);
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws ExecutionException {
+ throw this.exception;
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+ throw this.exception;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/MockProducer.java b/clients/src/main/java/kafka/clients/producer/MockProducer.java
index 2ea2030..ab83d5f 100644
--- a/clients/src/main/java/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/kafka/clients/producer/MockProducer.java
@@ -7,11 +7,14 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
+import kafka.clients.producer.internals.FutureRecordMetadata;
+import kafka.clients.producer.internals.Partitioner;
import kafka.clients.producer.internals.ProduceRequestResult;
import kafka.common.Cluster;
import kafka.common.Metric;
-import kafka.common.Serializer;
+import kafka.common.PartitionInfo;
import kafka.common.TopicPartition;
/**
@@ -22,10 +25,8 @@ import kafka.common.TopicPartition;
*/
public class MockProducer implements Producer {
- private final Serializer keySerializer;
- private final Serializer valueSerializer;
- private final Partitioner partitioner;
private final Cluster cluster;
+ private final Partitioner partitioner = new Partitioner();
private final List<ProducerRecord> sent;
private final Deque<Completion> completions;
private boolean autoComplete;
@@ -34,21 +35,13 @@ public class MockProducer implements Producer {
/**
* Create a mock producer
*
- * @param keySerializer A serializer to use on keys (useful to test your serializer on the values)
- * @param valueSerializer A serializer to use on values (useful to test your serializer on the values)
- * @param partitioner A partitioner to choose partitions (if null the partition will always be 0)
- * @param cluster The cluster to pass to the partitioner (can be null if partitioner is null)
+ * @param cluster The cluster holding metadata for this producer
* @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
* the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
- * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link RecordSend} that is
- * returned.
+ * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link
+ * java.util.concurrent.Future Future<RecordMetadata>} that is returned.
*/
- public MockProducer(Serializer keySerializer, Serializer valueSerializer, Partitioner partitioner, Cluster cluster, boolean autoComplete) {
- if (partitioner != null && (cluster == null | keySerializer == null | valueSerializer == null))
- throw new IllegalArgumentException("If a partitioner is provided a cluster instance and key and value serializer for partitioning must also be given.");
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
- this.partitioner = partitioner;
+ public MockProducer(Cluster cluster, boolean autoComplete) {
this.cluster = cluster;
this.autoComplete = autoComplete;
this.offsets = new HashMap<TopicPartition, Long>();
@@ -57,17 +50,16 @@ public class MockProducer implements Producer {
}
/**
- * Create a new mock producer with no serializers or partitioner and the given autoComplete setting.
+ * Create a new mock producer with invented metadata the given autoComplete setting.
*
- * Equivalent to {@link #MockProducer(Serializer, Serializer, Partitioner, Cluster, boolean) new MockProducer(null,
- * null, null, null, autoComplete)}
+ * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)}
*/
public MockProducer(boolean autoComplete) {
- this(null, null, null, null, autoComplete);
+ this(Cluster.empty(), autoComplete);
}
/**
- * Create a new auto completing mock producer with no serializers or partitioner.
+ * Create a new auto completing mock producer
*
* Equivalent to {@link #MockProducer(boolean) new MockProducer(true)}
*/
@@ -76,39 +68,36 @@ public class MockProducer implements Producer {
}
/**
- * Adds the record to the list of sent records. The {@link RecordSend} returned will be immediately satisfied.
+ * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied.
*
* @see #history()
*/
@Override
- public synchronized RecordSend send(ProducerRecord record) {
+ public synchronized Future<RecordMetadata> send(ProducerRecord record) {
return send(record, null);
}
/**
- * Adds the record to the list of sent records. The {@link RecordSend} returned will be immediately satisfied and
- * the callback will be synchronously executed.
+ * Adds the record to the list of sent records.
*
* @see #history()
*/
@Override
- public synchronized RecordSend send(ProducerRecord record, Callback callback) {
- byte[] key = keySerializer == null ? null : keySerializer.toBytes(record.key());
- byte[] partitionKey = keySerializer == null ? null : keySerializer.toBytes(record.partitionKey());
- byte[] value = valueSerializer == null ? null : valueSerializer.toBytes(record.value());
- int numPartitions = partitioner == null ? 0 : this.cluster.partitionsFor(record.topic()).size();
- int partition = partitioner == null ? 0 : partitioner.partition(record, key, partitionKey, value, this.cluster, numPartitions);
+ public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
+ int partition = 0;
+ if (this.cluster.partitionsFor(record.topic()) != null)
+ partition = partitioner.partition(record, this.cluster);
ProduceRequestResult result = new ProduceRequestResult();
- RecordSend send = new RecordSend(0, result);
+ FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
long offset = nextOffset(topicPartition);
- Completion completion = new Completion(topicPartition, offset, send, result, callback);
+ Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, offset), result, callback);
this.sent.add(record);
if (autoComplete)
completion.complete(null);
else
this.completions.addLast(completion);
- return send;
+ return future;
}
/**
@@ -126,13 +115,14 @@ public class MockProducer implements Producer {
}
}
+ public List<PartitionInfo> partitionsFor(String topic) {
+ return this.cluster.partitionsFor(topic);
+ }
+
public Map<String, Metric> metrics() {
return Collections.emptyMap();
}
- /**
- * "Closes" the producer
- */
@Override
public void close() {
}
@@ -178,13 +168,17 @@ public class MockProducer implements Producer {
private static class Completion {
private final long offset;
- private final RecordSend send;
+ private final RecordMetadata metadata;
private final ProduceRequestResult result;
private final Callback callback;
private final TopicPartition topicPartition;
- public Completion(TopicPartition topicPartition, long offset, RecordSend send, ProduceRequestResult result, Callback callback) {
- this.send = send;
+ public Completion(TopicPartition topicPartition,
+ long offset,
+ RecordMetadata metadata,
+ ProduceRequestResult result,
+ Callback callback) {
+ this.metadata = metadata;
this.offset = offset;
this.result = result;
this.callback = callback;
@@ -193,8 +187,12 @@ public class MockProducer implements Producer {
public void complete(RuntimeException e) {
result.done(topicPartition, e == null ? offset : -1L, e);
- if (callback != null)
- callback.onCompletion(send);
+ if (callback != null) {
+ if (e == null)
+ callback.onCompletion(metadata, null);
+ else
+ callback.onCompletion(null, e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Partitioner.java b/clients/src/main/java/kafka/clients/producer/Partitioner.java
deleted file mode 100644
index 1b8e51f..0000000
--- a/clients/src/main/java/kafka/clients/producer/Partitioner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package kafka.clients.producer;
-
-import kafka.common.Cluster;
-
-/**
- * An interface by which clients can override the default partitioning behavior that maps records to topic partitions.
- * <p>
- * A partitioner can use either the original java object the user provided or the serialized bytes.
- * <p>
- * It is expected that the partitioner will make use the key for partitioning, but there is no requirement that an
- * implementation do so. An implementation can use the key, the value, the state of the cluster, or any other side data.
- */
-public interface Partitioner {
-
- /**
- * Compute the partition for the given record. This partition number must be in the range [0...numPartitions). The
- * cluster state provided is the most up-to-date view that the client has but leadership can change at any time so
- * there is no guarantee that the node that is the leader for a particular partition at the time the partition
- * function is called will still be the leader by the time the request is sent.
- *
- * @param record The record being sent
- * @param key The serialized bytes of the key (null if no key is given or the serialized form is null)
- * @param value The serialized bytes of the value (null if no value is given or the serialized form is null)
- * @param cluster The current state of the cluster
- * @param numPartitions The total number of partitions for the given topic
- * @return The partition to send this record to
- */
- public int partition(ProducerRecord record, byte[] key, byte[] partitionKey, byte[] value, Cluster cluster, int numPartitions);
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Producer.java b/clients/src/main/java/kafka/clients/producer/Producer.java
index 6ba6633..f149f3a 100644
--- a/clients/src/main/java/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/kafka/clients/producer/Producer.java
@@ -1,8 +1,12 @@
package kafka.clients.producer;
+import java.io.Closeable;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
import kafka.common.Metric;
+import kafka.common.PartitionInfo;
/**
* The interface for the {@link KafkaProducer}
@@ -10,7 +14,7 @@ import kafka.common.Metric;
* @see KafkaProducer
* @see MockProducer
*/
-public interface Producer {
+public interface Producer extends Closeable {
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
@@ -18,12 +22,18 @@ public interface Producer {
* @param record The record to send
* @return A future which will eventually contain the response information
*/
- public RecordSend send(ProducerRecord record);
+ public Future<RecordMetadata> send(ProducerRecord record);
/**
- * Send a message and invoke the given callback when the send is complete
+ * Send a record and invoke the given callback when the record has been acknowledged by the server
*/
- public RecordSend send(ProducerRecord record, Callback callback);
+ public Future<RecordMetadata> send(ProducerRecord record, Callback callback);
+
+ /**
+ * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
+ * over time so this list should not be cached.
+ */
+ public List<PartitionInfo> partitionsFor(String topic);
/**
* Return a map of metrics maintained by the producer
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
index 9758293..a94afc7 100644
--- a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
@@ -25,19 +25,19 @@ public class ProducerConfig extends AbstractConfig {
public static final String BROKER_LIST_CONFIG = "metadata.broker.list";
/**
- * The amount of time to block waiting to fetch metadata about a topic the first time a message is sent to that
+ * The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that
* topic.
*/
public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
/**
- * The buffer size allocated for a partition. When messages are received which are smaller than this size the
+ * The buffer size allocated for a partition. When records are received which are smaller than this size the
* producer will attempt to optimistically group them together until this size is reached.
*/
public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes";
/**
- * The total memory used by the producer to buffer messages waiting to be sent to the server. If messages are sent
+ * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent
* faster than they can be delivered to the server the producer will either block or throw an exception based on the
* preference specified by {@link #BLOCK_ON_BUFFER_FULL}.
*/
@@ -56,32 +56,17 @@ public class ProducerConfig extends AbstractConfig {
public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms";
/**
- * The producer groups together any messages that arrive in between request sends. Normally this occurs only under
- * load when messages arrive faster than they can be sent out. However the client can reduce the number of requests
- * and increase throughput by adding a small amount of artificial delay to force more messages to batch together.
- * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of messages
+ * The producer groups together any records that arrive in between request sends. Normally this occurs only under
+ * load when records arrive faster than they can be sent out. However the client can reduce the number of requests
+ * and increase throughput by adding a small amount of artificial delay to force more records to batch together.
+ * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of records
* for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many
- * bytes accumulated for this partition we will "linger" for the specified time waiting for more messages to show
- * up. This setting defaults to 0.
+ * bytes accumulated for this partition we will "linger" for the specified time waiting for more records to show up.
+ * This setting defaults to 0.
*/
public static final String LINGER_MS_CONFIG = "linger.ms";
/**
- * The fully qualified name of the {@link kafka.common.Serializer} class to use for serializing record values.
- */
- public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer.class";
-
- /**
- * The fully qualified name of the {@link kafka.common.Serializer} class to use for serializing record keys.
- */
- public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer.class";
-
- /**
- * The class to use for choosing a partition to send the message to
- */
- public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
-
- /**
* Force a refresh of the cluster metadata after this period of time. This ensures that changes to the number of
* partitions or other settings will by taken up by producers without restart.
*/
@@ -99,8 +84,8 @@ public class ProducerConfig extends AbstractConfig {
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
/**
- * The maximum size of a request. This is also effectively a cap on the maximum message size. Note that the server
- * has its own cap on message size which may be different from this.
+ * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server
+ * has its own cap on record size which may be different from this.
*/
public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
@@ -111,9 +96,9 @@ public class ProducerConfig extends AbstractConfig {
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
/**
- * When our memory buffer is exhausted we must either stop accepting new messages (block) or throw errors. By
- * default this setting is true and we block, however users who want to guarantee we never block can turn this into
- * an error.
+ * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default
+ * this setting is true and we block, however users who want to guarantee we never block can turn this into an
+ * error.
*/
public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full";
@@ -129,9 +114,6 @@ public class ProducerConfig extends AbstractConfig {
.define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah")
.define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah")
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah")
- .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "blah blah")
- .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "blah blah")
- .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, DefaultPartitioner.class.getName(), "blah blah")
.define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
.define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
index 5fddbef..b9c20bc 100644
--- a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
@@ -1,34 +1,34 @@
package kafka.clients.producer;
/**
- * An unserialized key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent,
- * a value (which can be null) which is the contents of the record and an optional key (which can also be null). In
- * cases the key used for choosing a partition is going to be different the user can specify a partition key which will
- * be used only for computing the partition to which this record will be sent and will not be retained with the record.
+ * A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional
+ * partition number, and an optional key and value.
+ * <p>
+ * If a valid partition number is specified that partition will be used when sending the record. If no partition is
+ * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
+ * present a partition will be assigned in a round-robin fashion.
*/
public final class ProducerRecord {
private final String topic;
- private final Object key;
- private final Object partitionKey;
- private final Object value;
+ private final Integer partition;
+ private final byte[] key;
+ private final byte[] value;
/**
- * Creates a record to be sent to Kafka using a special override key for partitioning that is different form the key
- * retained in the record
+ * Creates a record to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
+ * @param partition The partition to which the record should be sent
* @param key The key that will be included in the record
- * @param partitionKey An override for the key to be used only for partitioning purposes in the client. This key
- * will not be retained or available to downstream consumers.
* @param value The record contents
*/
- public ProducerRecord(String topic, Object key, Object partitionKey, Object value) {
+ public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
this.topic = topic;
+ this.partition = partition;
this.key = key;
- this.partitionKey = partitionKey;
this.value = value;
}
@@ -39,8 +39,8 @@ public final class ProducerRecord {
* @param key The key that will be included in the record
* @param value The record contents
*/
- public ProducerRecord(String topic, Object key, Object value) {
- this(topic, key, key, value);
+ public ProducerRecord(String topic, byte[] key, byte[] value) {
+ this(topic, null, key, value);
}
/**
@@ -49,7 +49,7 @@ public final class ProducerRecord {
* @param topic The topic this record should be sent to
* @param value The record contents
*/
- public ProducerRecord(String topic, Object value) {
+ public ProducerRecord(String topic, byte[] value) {
this(topic, null, value);
}
@@ -63,22 +63,22 @@ public final class ProducerRecord {
/**
* The key (or null if no key is specified)
*/
- public Object key() {
+ public byte[] key() {
return key;
}
/**
- * An override key to use instead of the main record key
+ * @return The value
*/
- public Object partitionKey() {
- return partitionKey;
+ public byte[] value() {
+ return value;
}
/**
- * @return The value
+ * The partition to which the record will be sent (or null if no partition was specified)
*/
- public Object value() {
- return value;
+ public Integer partition() {
+ return partition;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
new file mode 100644
index 0000000..1486586
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
@@ -0,0 +1,39 @@
+package kafka.clients.producer;
+
+import kafka.common.TopicPartition;
+
+/**
+ * The metadata for a record that has been acknowledged by the server
+ */
+public final class RecordMetadata {
+
+ private final long offset;
+ private final TopicPartition topicPartition;
+
+ public RecordMetadata(TopicPartition topicPartition, long offset) {
+ super();
+ this.offset = offset;
+ this.topicPartition = topicPartition;
+ }
+
+ /**
+ * The offset of the record in the topic/partition.
+ */
+ public long offset() {
+ return this.offset;
+ }
+
+ /**
+ * The topic the record was appended to
+ */
+ public String topic() {
+ return this.topicPartition.topic();
+ }
+
+ /**
+ * The partition the record was sent to
+ */
+ public int partition() {
+ return this.topicPartition.partition();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/RecordSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/RecordSend.java b/clients/src/main/java/kafka/clients/producer/RecordSend.java
deleted file mode 100644
index 1883dab..0000000
--- a/clients/src/main/java/kafka/clients/producer/RecordSend.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package kafka.clients.producer;
-
-import java.util.concurrent.TimeUnit;
-
-import kafka.clients.producer.internals.ProduceRequestResult;
-import kafka.common.errors.ApiException;
-import kafka.common.errors.TimeoutException;
-
-/**
- * An asynchronously computed response from sending a record. Calling <code>await()</code> or most of the other accessor
- * methods will block until the response for this record is available. If you wish to avoid blocking provide a
- * {@link kafka.clients.producer.Callback Callback} with the record send.
- */
-public final class RecordSend {
-
- private final long relativeOffset;
- private final ProduceRequestResult result;
-
- public RecordSend(long relativeOffset, ProduceRequestResult result) {
- this.relativeOffset = relativeOffset;
- this.result = result;
- }
-
- /**
- * Block until this send has completed successfully. If the request fails, throw the error that occurred in sending
- * the request.
- * @return the same object for chaining of calls
- * @throws TimeoutException if the thread is interrupted while waiting
- * @throws ApiException if the request failed.
- */
- public RecordSend await() {
- result.await();
- if (result.error() != null)
- throw result.error();
- return this;
- }
-
- /**
- * Block until this send is complete or the given timeout elapses
- * @param timeout the time to wait
- * @param unit the units of the time given
- * @return the same object for chaining
- * @throws TimeoutException if the request isn't satisfied in the time period given or the thread is interrupted
- * while waiting
- * @throws ApiException if the request failed.
- */
- public RecordSend await(long timeout, TimeUnit unit) {
- boolean success = result.await(timeout, unit);
- if (!success)
- throw new TimeoutException("Request did not complete after " + timeout + " " + unit);
- if (result.error() != null)
- throw result.error();
- return this;
- }
-
- /**
- * Get the offset for the given message. This method will block until the request is complete and will throw an
- * exception if the request fails.
- * @return The offset
- */
- public long offset() {
- await();
- return this.result.baseOffset() + this.relativeOffset;
- }
-
- /**
- * Check if the request is complete without blocking
- */
- public boolean completed() {
- return this.result.completed();
- }
-
- /**
- * Block on request completion and return true if there was an error.
- */
- public boolean hasError() {
- result.await();
- return this.result.error() != null;
- }
-
- /**
- * Return the error thrown
- */
- public Exception error() {
- result.await();
- return this.result.error();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
new file mode 100644
index 0000000..43b4c5d
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -0,0 +1,63 @@
+package kafka.clients.producer.internals;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import kafka.clients.producer.RecordMetadata;
+
+/**
+ * The future result of a record send
+ */
+public final class FutureRecordMetadata implements Future<RecordMetadata> {
+
+ private final ProduceRequestResult result;
+ private final long relativeOffset;
+
+ public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) {
+ this.result = result;
+ this.relativeOffset = relativeOffset;
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws InterruptedException, ExecutionException {
+ this.result.await();
+ return valueOrError();
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ boolean occurred = this.result.await(timeout, unit);
+ if (!occurred)
+ throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
+ return valueOrError();
+ }
+
+ private RecordMetadata valueOrError() throws ExecutionException {
+ if (this.result.error() != null)
+ throw new ExecutionException(this.result.error());
+ else
+ return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset);
+ }
+
+ public long relativeOffset() {
+ return this.relativeOffset;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return this.result.completed();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
new file mode 100644
index 0000000..6d2188e
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
@@ -0,0 +1,55 @@
+package kafka.clients.producer.internals;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import kafka.clients.producer.ProducerRecord;
+import kafka.common.Cluster;
+import kafka.common.PartitionInfo;
+import kafka.common.utils.Utils;
+
+/**
+ * The default partitioning strategy:
+ * <ul>
+ * <li>If a partition is specified in the record, use it
+ * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
+ * <li>If no partition or key is present choose a partition in a round-robin fashion
+ */
+public class Partitioner {
+
+ private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
+
+ /**
+ * Compute the partition for the given record.
+ *
+ * @param record The record being sent
+ * @param numPartitions The total number of partitions for the given topic
+ */
+ public int partition(ProducerRecord record, Cluster cluster) {
+ List<PartitionInfo> partitions = cluster.partitionsFor(record.topic());
+ int numPartitions = partitions.size();
+ if (record.partition() != null) {
+ // they have given us a partition, use it
+ if (record.partition() < 0 || record.partition() >= numPartitions)
+ throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
+ + " is not in the range [0..."
+ + numPartitions
+ + "].");
+ return record.partition();
+ } else if (record.key() == null) {
+ // choose the next available node in a round-robin fashion
+ for (int i = 0; i < numPartitions; i++) {
+ int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
+ if (partitions.get(partition).leader() != null)
+ return partition;
+ }
+ // no partitions are available, give a non-available partition
+ return Utils.abs(counter.getAndIncrement()) % numPartitions;
+ } else {
+ // hash the key to choose a partition
+ return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
index 1049b61..cdae00a 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -3,14 +3,13 @@ package kafka.clients.producer.internals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import kafka.clients.producer.RecordSend;
+import kafka.clients.producer.RecordMetadata;
import kafka.common.TopicPartition;
-import kafka.common.errors.TimeoutException;
/**
* A class that models the future completion of a produce request for a single partition. There is one of these per
- * partition in a produce request and it is shared by all the {@link RecordSend} instances that are batched together for
- * the same partition in the request.
+ * partition in a produce request and it is shared by all the {@link RecordMetadata} instances that are batched together
+ * for the same partition in the request.
*/
public final class ProduceRequestResult {
@@ -25,7 +24,7 @@ public final class ProduceRequestResult {
/**
* Mark this request as complete and unblock any threads waiting on its completion.
* @param topicPartition The topic and partition to which this record set was sent was sent
- * @param baseOffset The base offset assigned to the message
+ * @param baseOffset The base offset assigned to the record
* @param error The error that occurred if there was one, or null.
*/
public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
@@ -38,12 +37,8 @@ public final class ProduceRequestResult {
/**
* Await the completion of this request
*/
- public void await() {
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new TimeoutException("Interrupted while waiting for request to complete.");
- }
+ public void await() throws InterruptedException {
+ latch.await();
}
/**
@@ -52,16 +47,12 @@ public final class ProduceRequestResult {
* @param unit The unit for the max time
* @return true if the request completed, false if we timed out
*/
- public boolean await(long timeout, TimeUnit unit) {
- try {
- return latch.await(timeout, unit);
- } catch (InterruptedException e) {
- throw new TimeoutException("Interrupted while waiting for request to complete.");
- }
+ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+ return latch.await(timeout, unit);
}
/**
- * The base offset for the request (the first offset in the message set)
+ * The base offset for the request (the first offset in the record set)
*/
public long baseOffset() {
return baseOffset;
@@ -75,6 +66,13 @@ public final class ProduceRequestResult {
}
/**
+ * The topic and partition to which the record was appended
+ */
+ public TopicPartition topicPartition() {
+ return topicPartition;
+ }
+
+ /**
* Has the request completed?
*/
public boolean completed() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
index a2b536c..c22939f 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
@@ -11,7 +11,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import kafka.clients.producer.Callback;
-import kafka.clients.producer.RecordSend;
import kafka.common.TopicPartition;
import kafka.common.metrics.Measurable;
import kafka.common.metrics.MetricConfig;
@@ -67,21 +66,21 @@ public final class RecordAccumulator {
private void registerMetrics(Metrics metrics) {
metrics.addMetric("blocked_threads",
- "The number of user threads blocked waiting for buffer memory to enqueue their messages",
+ "The number of user threads blocked waiting for buffer memory to enqueue their records",
new Measurable() {
public double measure(MetricConfig config, long now) {
return free.queued();
}
});
metrics.addMetric("buffer_total_bytes",
- "The total amount of buffer memory that is available (not currently used for buffering messages).",
+ "The total amount of buffer memory that is available (not currently used for buffering records).",
new Measurable() {
public double measure(MetricConfig config, long now) {
return free.totalMemory();
}
});
metrics.addMetric("buffer_available_bytes",
- "The total amount of buffer memory that is available (not currently used for buffering messages).",
+ "The total amount of buffer memory that is available (not currently used for buffering records).",
new Measurable() {
public double measure(MetricConfig config, long now) {
return free.availableMemory();
@@ -100,7 +99,7 @@ public final class RecordAccumulator {
* @param compression The compression codec for the record
* @param callback The user-supplied callback to execute when the request is complete
*/
- public RecordSend append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
+ public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
// check if we have an in-progress batch
@@ -108,9 +107,9 @@ public final class RecordAccumulator {
synchronized (dq) {
RecordBatch batch = dq.peekLast();
if (batch != null) {
- RecordSend send = batch.tryAppend(key, value, compression, callback);
- if (send != null)
- return send;
+ FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback);
+ if (future != null)
+ return future;
}
}
@@ -120,19 +119,18 @@ public final class RecordAccumulator {
synchronized (dq) {
RecordBatch first = dq.peekLast();
if (first != null) {
- RecordSend send = first.tryAppend(key, value, compression, callback);
- if (send != null) {
- // somebody else found us a batch, return the one we waited for!
- // Hopefully this doesn't happen
+ FutureRecordMetadata future = first.tryAppend(key, value, compression, callback);
+ if (future != null) {
+ // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
// often...
free.deallocate(buffer);
- return send;
+ return future;
}
}
RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds());
- RecordSend send = Utils.notNull(batch.tryAppend(key, value, compression, callback));
+ FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback));
dq.addLast(batch);
- return send;
+ return future;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
index 4a536a2..633f4af 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
@@ -4,7 +4,7 @@ import java.util.ArrayList;
import java.util.List;
import kafka.clients.producer.Callback;
-import kafka.clients.producer.RecordSend;
+import kafka.clients.producer.RecordMetadata;
import kafka.common.TopicPartition;
import kafka.common.record.CompressionType;
import kafka.common.record.MemoryRecords;
@@ -31,19 +31,20 @@ public final class RecordBatch {
}
/**
- * Append the message to the current message set and return the relative offset within that message set
+ * Append the record to the current record set and return the relative offset within that record set
*
- * @return The RecordSend corresponding to this message or null if there isn't sufficient room.
+ * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
- public RecordSend tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
+ public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
this.records.append(0L, key, value, compression);
- RecordSend send = new RecordSend(this.recordCount++, this.produceFuture);
+ FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null)
- thunks.add(new Thunk(callback, send));
- return send;
+ thunks.add(new Thunk(callback, this.recordCount));
+ this.recordCount++;
+ return future;
}
}
@@ -58,7 +59,12 @@ public final class RecordBatch {
// execute callbacks
for (int i = 0; i < this.thunks.size(); i++) {
try {
- this.thunks.get(i).execute();
+ Thunk thunk = this.thunks.get(i);
+ if (exception == null)
+ thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset),
+ null);
+ else
+ thunk.callback.onCompletion(null, exception);
} catch (Exception e) {
e.printStackTrace();
}
@@ -70,15 +76,11 @@ public final class RecordBatch {
*/
final private static class Thunk {
final Callback callback;
- final RecordSend send;
+ final long relativeOffset;
- public Thunk(Callback callback, RecordSend send) {
+ public Thunk(Callback callback, long relativeOffset) {
this.callback = callback;
- this.send = send;
- }
-
- public void execute() {
- this.callback.onCompletion(this.send);
+ this.relativeOffset = relativeOffset;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/kafka/clients/producer/internals/Sender.java
index effeb9c..5ac487b 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/Sender.java
@@ -283,7 +283,8 @@ public class Sender implements Runnable {
private void handleMetadataResponse(Struct body, long now) {
this.metadataFetchInProgress = false;
- this.metadata.update(ProtoUtils.parseMetadataResponse(body), now);
+ Cluster cluster = ProtoUtils.parseMetadataResponse(body);
+ this.metadata.update(cluster, now);
}
/**
@@ -377,7 +378,7 @@ public class Sender implements Runnable {
buffer.flip();
Struct part = topicData.instance("data")
.set("partition", parts.get(i).topicPartition.partition())
- .set("message_set", buffer);
+ .set("record_set", buffer);
partitionData[i] = part;
}
topicData.set("data", partitionData);
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
index 7331b73..973eb5e 100644
--- a/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
@@ -7,47 +7,42 @@ import kafka.clients.producer.Callback;
import kafka.clients.producer.KafkaProducer;
import kafka.clients.producer.ProducerConfig;
import kafka.clients.producer.ProducerRecord;
-import kafka.clients.producer.RecordSend;
-import kafka.common.ByteSerialization;
+import kafka.clients.producer.RecordMetadata;
+import kafka.common.record.Records;
public class ProducerPerformance {
public static void main(String[] args) throws Exception {
if (args.length != 3) {
- System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_messages message_size");
+ System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_records record_size");
System.exit(1);
}
String url = args[0];
- int numMessages = Integer.parseInt(args[1]);
- int messageSize = Integer.parseInt(args[2]);
+ int numRecords = Integer.parseInt(args[1]);
+ int recordSize = Integer.parseInt(args[2]);
Properties props = new Properties();
props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1");
props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
- props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteSerialization.class.getName());
- props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteSerialization.class.getName());
KafkaProducer producer = new KafkaProducer(props);
Callback callback = new Callback() {
- public void onCompletion(RecordSend send) {
- try {
- send.offset();
- } catch (Exception e) {
+ public void onCompletion(RecordMetadata metadata, Exception e) {
+ if (e != null)
e.printStackTrace();
- }
}
};
- byte[] payload = new byte[messageSize];
+ byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
ProducerRecord record = new ProducerRecord("test", payload);
long start = System.currentTimeMillis();
long maxLatency = -1L;
long totalLatency = 0;
int reportingInterval = 1000000;
- for (int i = 0; i < numMessages; i++) {
+ for (int i = 0; i < numRecords; i++) {
long sendStart = System.currentTimeMillis();
- producer.send(record, null);
+ producer.send(record, callback);
long sendEllapsed = System.currentTimeMillis() - sendStart;
maxLatency = Math.max(maxLatency, sendEllapsed);
totalLatency += sendEllapsed;
@@ -61,9 +56,9 @@ public class ProducerPerformance {
}
}
long ellapsed = System.currentTimeMillis() - start;
- double msgsSec = 1000.0 * numMessages / (double) ellapsed;
- double mbSec = msgsSec * messageSize / (1024.0 * 1024.0);
- System.out.printf("%d messages sent in %d ms ms. %.2f messages per second (%.2f mb/sec).", numMessages, ellapsed, msgsSec, mbSec);
+ double msgsSec = 1000.0 * numRecords / (double) ellapsed;
+ double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
+ System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).", numRecords, ellapsed, msgsSec, mbSec);
producer.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/ByteSerialization.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/ByteSerialization.java b/clients/src/main/java/kafka/common/ByteSerialization.java
deleted file mode 100644
index eca69f1..0000000
--- a/clients/src/main/java/kafka/common/ByteSerialization.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package kafka.common;
-
-/**
- * A serialization implementation that just retains the provided byte array unchanged
- */
-public class ByteSerialization implements Serializer, Deserializer {
-
- @Override
- public Object fromBytes(byte[] bytes) {
- return bytes;
- }
-
- @Override
- public byte[] toBytes(Object o) {
- return (byte[]) o;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Cluster.java b/clients/src/main/java/kafka/common/Cluster.java
index d0acd8d..8d045d5 100644
--- a/clients/src/main/java/kafka/common/Cluster.java
+++ b/clients/src/main/java/kafka/common/Cluster.java
@@ -12,13 +12,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import kafka.common.utils.Utils;
/**
- * A representation of the nodes, topics, and partitions in the Kafka cluster
+ * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
*/
public final class Cluster {
private final AtomicInteger counter = new AtomicInteger(0);
private final List<Node> nodes;
- private final Map<Integer, Node> nodesById;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
@@ -28,22 +27,28 @@ public final class Cluster {
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/
public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
- this.nodes = new ArrayList<Node>(nodes);
- this.nodesById = new HashMap<Integer, Node>(this.nodes.size());
- this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
- this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partitions.size());
+ // make a randomized, unmodifiable copy of the nodes
+ List<Node> copy = new ArrayList<Node>(nodes);
+ Collections.shuffle(copy);
+ this.nodes = Collections.unmodifiableList(copy);
- Collections.shuffle(this.nodes);
- for (Node n : this.nodes)
- this.nodesById.put(n.id(), n);
+ // index the partitions by topic/partition for quick lookup
+ this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
for (PartitionInfo p : partitions)
this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
+
+ // index the partitions by topic and make the lists unmodifiable so we can handle them out in
+ // user-facing apis without risk of the client modifying the contents
+ HashMap<String, List<PartitionInfo>> parts = new HashMap<String, List<PartitionInfo>>();
for (PartitionInfo p : partitions) {
- if (!this.partitionsByTopic.containsKey(p.topic()))
- this.partitionsByTopic.put(p.topic(), new ArrayList<PartitionInfo>());
- List<PartitionInfo> ps = this.partitionsByTopic.get(p.topic());
+ if (!parts.containsKey(p.topic()))
+ parts.put(p.topic(), new ArrayList<PartitionInfo>());
+ List<PartitionInfo> ps = parts.get(p.topic());
ps.add(p);
}
+ this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(parts.size());
+ for (Map.Entry<String, List<PartitionInfo>> entry : parts.entrySet())
+ this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
}
/**
@@ -67,6 +72,13 @@ public final class Cluster {
}
/**
+ * @return The known set of nodes
+ */
+ public List<Node> nodes() {
+ return this.nodes;
+ }
+
+ /**
* Get the current leader for the given topic-partition
* @param topicPartition The topic and partition we want to know the leader for
* @return The node that is the leader for this topic-partition, or null if there is currently no leader
@@ -76,7 +88,16 @@ public final class Cluster {
if (info == null)
return null;
else
- return nodesById.get(info.leader());
+ return info.leader();
+ }
+
+ /**
+ * Get the metadata for the specified partition
+ * @param topicPartition The topic and partition to fetch info for
+ * @return The metadata about the given topic and partition
+ */
+ public PartitionInfo partition(TopicPartition topicPartition) {
+ return partitionsByTopicPartition.get(topicPartition);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Deserializer.java b/clients/src/main/java/kafka/common/Deserializer.java
deleted file mode 100644
index ad2e784..0000000
--- a/clients/src/main/java/kafka/common/Deserializer.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package kafka.common;
-
-/**
- * A class that controls how an object is turned into bytes. Classes implementing this interface will generally be
- * instantiated by the framework.
- * <p>
- * An implementation that requires special configuration parameters can implement {@link Configurable}
- */
-public interface Deserializer {
-
- /**
- * Map a byte[] to an object
- * @param bytes The bytes for the object (can be null)
- * @return The deserialized object (can return null)
- */
- public Object fromBytes(byte[] bytes);
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/PartitionInfo.java b/clients/src/main/java/kafka/common/PartitionInfo.java
index f3f08dd..0e50ed7 100644
--- a/clients/src/main/java/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/kafka/common/PartitionInfo.java
@@ -7,11 +7,11 @@ public class PartitionInfo {
private final String topic;
private final int partition;
- private final int leader;
- private final int[] replicas;
- private final int[] inSyncReplicas;
+ private final Node leader;
+ private final Node[] replicas;
+ private final Node[] inSyncReplicas;
- public PartitionInfo(String topic, int partition, int leader, int[] replicas, int[] inSyncReplicas) {
+ public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
this.topic = topic;
this.partition = partition;
this.leader = leader;
@@ -36,14 +36,14 @@ public class PartitionInfo {
/**
* The node id of the node currently acting as a leader for this partition or -1 if there is no leader
*/
- public int leader() {
+ public Node leader() {
return leader;
}
/**
* The complete set of replicas for this partition regardless of whether they are alive or up-to-date
*/
- public int[] replicas() {
+ public Node[] replicas() {
return replicas;
}
@@ -51,7 +51,7 @@ public class PartitionInfo {
* The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
* the leader should fail
*/
- public int[] inSyncReplicas() {
+ public Node[] inSyncReplicas() {
return inSyncReplicas;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Serializer.java b/clients/src/main/java/kafka/common/Serializer.java
deleted file mode 100644
index 63353d8..0000000
--- a/clients/src/main/java/kafka/common/Serializer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package kafka.common;
-
-/**
- * A class that controls how an object is turned into bytes. Classes implementing this interface will generally be
- * instantiated by the framework.
- * <p>
- * An implementation should handle null inputs.
- * <p>
- * An implementation that requires special configuration parameters can implement {@link Configurable}
- */
-public interface Serializer {
-
- /**
- * Translate an object into bytes. The serializer must handle null inputs, and will generally just return null in
- * this case.
- * @param o The object to serialize, can be null
- * @return The serialized bytes for the object or null
- */
- public byte[] toBytes(Object o);
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/StringSerialization.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/StringSerialization.java b/clients/src/main/java/kafka/common/StringSerialization.java
deleted file mode 100644
index c0ed5ca..0000000
--- a/clients/src/main/java/kafka/common/StringSerialization.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package kafka.common;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Map;
-
-/**
- * A serializer and deserializer for strings.
- * <p>
- * This class accepts a configuration parameter string.encoding which can take the string name of any supported
- * encoding. If no encoding is specified the default will be UTF-8.
- */
-public class StringSerialization implements Serializer, Deserializer, Configurable {
-
- private final static String ENCODING_CONFIG = "string.encoding";
-
- private String encoding;
-
- public StringSerialization(String encoding) {
- super();
- this.encoding = encoding;
- }
-
- public StringSerialization() {
- this("UTF8");
- }
-
- public void configure(Map<String, ?> configs) {
- if (configs.containsKey(ENCODING_CONFIG))
- this.encoding = (String) configs.get(ENCODING_CONFIG);
- }
-
- @Override
- public Object fromBytes(byte[] bytes) {
- if (bytes == null) {
- return null;
- } else {
- try {
- return new String(bytes, encoding);
- } catch (UnsupportedEncodingException e) {
- throw new KafkaException(e);
- }
- }
- }
-
- @Override
- public byte[] toBytes(Object o) {
- if (o == null) {
- return null;
- } else {
- try {
- return ((String) o).getBytes(encoding);
- } catch (UnsupportedEncodingException e) {
- throw new KafkaException(e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/CorruptMessageException.java b/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
deleted file mode 100644
index faf6234..0000000
--- a/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class CorruptMessageException extends ApiException {
-
- private static final long serialVersionUID = 1L;
-
- public CorruptMessageException() {
- super("This message has failed it's CRC checksum or is otherwise corrupt.");
- }
-
- public CorruptMessageException(String message) {
- super(message);
- }
-
- public CorruptMessageException(Throwable cause) {
- super(cause);
- }
-
- public CorruptMessageException(String message, Throwable cause) {
- super(message, cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
new file mode 100644
index 0000000..492f2e3
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class CorruptRecordException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CorruptRecordException() {
+ super("This message has failed it's CRC checksum or is otherwise corrupt.");
+ }
+
+ public CorruptRecordException(String message) {
+ super(message);
+ }
+
+ public CorruptRecordException(Throwable cause) {
+ super(cause);
+ }
+
+ public CorruptRecordException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java b/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
deleted file mode 100644
index 7417906..0000000
--- a/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class MessageTooLargeException extends ApiException {
-
- private static final long serialVersionUID = 1L;
-
- public MessageTooLargeException() {
- super();
- }
-
- public MessageTooLargeException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public MessageTooLargeException(String message) {
- super(message);
- }
-
- public MessageTooLargeException(Throwable cause) {
- super(cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
new file mode 100644
index 0000000..bef4293
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class RecordTooLargeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RecordTooLargeException() {
+ super();
+ }
+
+ public RecordTooLargeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RecordTooLargeException(String message) {
+ super(message);
+ }
+
+ public RecordTooLargeException(Throwable cause) {
+ super(cause);
+ }
+
+}