You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/04 17:40:20 UTC

[2/2] git commit: Implement a few of the API suggestions from the mailing list.

Implement a few of the API suggestions from the mailing list.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/253f86e3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/253f86e3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/253f86e3

Branch: refs/heads/trunk
Commit: 253f86e31062fb86401abdc13835c251eef47417
Parents: c9052c5
Author: Jay Kreps <ja...@gmail.com>
Authored: Sun Feb 2 21:52:25 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 4 08:39:56 2014 -0800

----------------------------------------------------------------------
 .../java/kafka/clients/producer/Callback.java   |  13 +-
 .../clients/producer/DefaultPartitioner.java    |  35 ----
 .../kafka/clients/producer/KafkaProducer.java   | 164 ++++++++++++-------
 .../kafka/clients/producer/MockProducer.java    |  82 +++++-----
 .../kafka/clients/producer/Partitioner.java     |  30 ----
 .../java/kafka/clients/producer/Producer.java   |  18 +-
 .../kafka/clients/producer/ProducerConfig.java  |  46 ++----
 .../kafka/clients/producer/ProducerRecord.java  |  46 +++---
 .../kafka/clients/producer/RecordMetadata.java  |  39 +++++
 .../java/kafka/clients/producer/RecordSend.java |  88 ----------
 .../internals/FutureRecordMetadata.java         |  63 +++++++
 .../clients/producer/internals/Partitioner.java |  55 +++++++
 .../internals/ProduceRequestResult.java         |  34 ++--
 .../producer/internals/RecordAccumulator.java   |  28 ++--
 .../clients/producer/internals/RecordBatch.java |  32 ++--
 .../clients/producer/internals/Sender.java      |   5 +-
 .../clients/tools/ProducerPerformance.java      |  31 ++--
 .../java/kafka/common/ByteSerialization.java    |  18 --
 clients/src/main/java/kafka/common/Cluster.java |  47 ++++--
 .../main/java/kafka/common/Deserializer.java    |  18 --
 .../main/java/kafka/common/PartitionInfo.java   |  14 +-
 .../src/main/java/kafka/common/Serializer.java  |  21 ---
 .../java/kafka/common/StringSerialization.java  |  58 -------
 .../common/errors/CorruptMessageException.java  |  23 ---
 .../common/errors/CorruptRecordException.java   |  23 +++
 .../common/errors/MessageTooLargeException.java |  23 ---
 .../common/errors/RecordTooLargeException.java  |  23 +++
 .../kafka/common/network/ByteBufferReceive.java |   4 +-
 .../main/java/kafka/common/protocol/Errors.java |   8 +-
 .../java/kafka/common/protocol/ProtoUtils.java  |  28 ++--
 .../java/kafka/common/protocol/Protocol.java    |   2 +-
 .../java/kafka/common/record/MemoryRecords.java |   4 +-
 .../main/java/kafka/common/record/Record.java   |   4 +-
 .../kafka/clients/producer/MetadataTest.java    |  10 +-
 .../clients/producer/MockProducerTest.java      |  67 ++++----
 .../kafka/clients/producer/PartitionerTest.java |  54 ++++++
 .../kafka/clients/producer/RecordSendTest.java  |  32 ++--
 .../java/kafka/clients/producer/SenderTest.java |  17 +-
 clients/src/test/java/kafka/test/TestUtils.java |  22 +++
 39 files changed, 670 insertions(+), 659 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/Callback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Callback.java b/clients/src/main/java/kafka/clients/producer/Callback.java
index 47e5af3..d287d78 100644
--- a/clients/src/main/java/kafka/clients/producer/Callback.java
+++ b/clients/src/main/java/kafka/clients/producer/Callback.java
@@ -2,14 +2,17 @@ package kafka.clients.producer;
 
 /**
  * A callback interface that the user can implement to allow code to execute when the request is complete. This callback
- * will execute in the background I/O thread so it should be fast.
+ * will generally execute in the background I/O thread so it should be fast.
  */
 public interface Callback {
 
     /**
-     * A callback method the user should implement. This method will be called when the send to the server has
-     * completed.
-     * @param send The results of the call. This send is guaranteed to be completed so none of its methods will block.
+     * A callback method the user can implement to provide asynchronous handling of request completion. This method will
+     * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
+     * non-null.
+     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
+     *        occurred.
+     * @param exception The exception thrown during processing of this record. Null if no error occurred.
      */
-    public void onCompletion(RecordSend send);
+    public void onCompletion(RecordMetadata metadata, Exception exception);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java b/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
deleted file mode 100644
index b82fcfb..0000000
--- a/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package kafka.clients.producer;
-
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import kafka.common.Cluster;
-import kafka.common.utils.Utils;
-
-/**
- * A simple partitioning strategy that will work for messages with or without keys.
- * <p>
- * If there is a partition key specified in the record the partitioner will use that for partitioning. Otherwise, if
- * there there is no partitionKey but there is a normal key that will be used. If neither key is specified the
- * partitioner will round-robin over partitions in the topic.
- * <p>
- * For the cases where there is some key present the partition is computed based on the murmur2 hash of the serialized
- * key.
- */
-public class DefaultPartitioner implements Partitioner {
-
-    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
-
-    /**
-     * Compute the partition
-     */
-    @Override
-    public int partition(ProducerRecord record, byte[] key, byte[] partitionKey, byte[] value, Cluster cluster, int numPartitions) {
-        byte[] keyToUse = partitionKey != null ? partitionKey : key;
-        if (keyToUse == null)
-            return Utils.abs(counter.getAndIncrement()) % numPartitions;
-        else
-            return Utils.abs(Utils.murmur2(keyToUse)) % numPartitions;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
index 58eee0c..1dd63fc 100644
--- a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
@@ -6,17 +6,22 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import kafka.clients.producer.internals.FutureRecordMetadata;
 import kafka.clients.producer.internals.Metadata;
+import kafka.clients.producer.internals.Partitioner;
 import kafka.clients.producer.internals.RecordAccumulator;
 import kafka.clients.producer.internals.Sender;
 import kafka.common.Cluster;
 import kafka.common.KafkaException;
 import kafka.common.Metric;
-import kafka.common.Serializer;
+import kafka.common.PartitionInfo;
 import kafka.common.TopicPartition;
 import kafka.common.config.ConfigException;
-import kafka.common.errors.MessageTooLargeException;
+import kafka.common.errors.RecordTooLargeException;
 import kafka.common.metrics.JmxReporter;
 import kafka.common.metrics.MetricConfig;
 import kafka.common.metrics.Metrics;
@@ -29,24 +34,22 @@ import kafka.common.utils.KafkaThread;
 import kafka.common.utils.SystemTime;
 
 /**
- * A Kafka producer that can be used to send data to the Kafka cluster.
+ * A Kafka client that publishes records to the Kafka cluster.
  * <P>
  * The producer is <i>thread safe</i> and should generally be shared among all threads for best performance.
  * <p>
  * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
- * needs to communicate with. Failure to close the producer after use will leak these.
+ * needs to communicate with. Failure to close the producer after use will leak these resources.
  */
 public class KafkaProducer implements Producer {
 
+    private final Partitioner partitioner;
     private final int maxRequestSize;
     private final long metadataFetchTimeoutMs;
     private final long totalMemorySize;
-    private final Partitioner partitioner;
     private final Metadata metadata;
     private final RecordAccumulator accumulator;
     private final Sender sender;
-    private final Serializer keySerializer;
-    private final Serializer valueSerializer;
     private final Metrics metrics;
     private final Thread ioThread;
 
@@ -72,9 +75,7 @@ public class KafkaProducer implements Producer {
         this.metrics = new Metrics(new MetricConfig(),
                                    Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
                                    new SystemTime());
-        this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
-        this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
-        this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
+        this.partitioner = new Partitioner();
         this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
         this.metadata = new Metadata();
         this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
@@ -126,78 +127,84 @@ public class KafkaProducer implements Producer {
      * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
      */
     @Override
-    public RecordSend send(ProducerRecord record) {
+    public Future<RecordMetadata> send(ProducerRecord record) {
         return send(record, null);
     }
 
     /**
      * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
      * <p>
-     * The send is asynchronous and this method will return immediately once the record has been serialized and stored
-     * in the buffer of messages waiting to be sent. This allows sending many records in parallel without necessitating
-     * blocking to wait for the response after each one.
+     * The send is asynchronous and this method will return immediately once the record has been stored in the buffer of
+     * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the
+     * response after each one.
+     * <p>
+     * The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to and the offset
+     * it was assigned.
+     * <p>
+     * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
+     * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
+     * get()} on this future will result in the metadata for the record or throw any exception that occurred while
+     * sending the record.
      * <p>
-     * The {@link RecordSend} returned by this call will hold the future response data including the offset assigned to
-     * the message and the error (if any) when the request has completed (or returned an error), and this object can be
-     * used to block awaiting the response. If you want the equivalent of a simple blocking send you can easily achieve
-     * that using the {@link kafka.clients.producer.RecordSend#await() await()} method on the {@link RecordSend} this
-     * call returns:
+     * If you want to simulate a simple blocking call you can do the following:
      * 
      * <pre>
-     *   ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
-     *   producer.send(myRecord, null).await();
+     *   producer.send(new ProducerRecord("the-topic", "key, "value")).get();
      * </pre>
-     * 
-     * Note that the send method will not throw an exception if the request fails while communicating with the cluster,
-     * rather that exception will be thrown when accessing the {@link RecordSend} that is returned.
      * <p>
      * Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
-     * will be invoked when the request is complete. Note that the callback will execute in the I/O thread of the
-     * producer and so should be reasonably fast. An example usage of an inline callback would be the following:
+     * will be invoked when the request is complete.
      * 
      * <pre>
      *   ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
      *   producer.send(myRecord,
      *                 new Callback() {
-     *                     public void onCompletion(RecordSend send) {
-     *                         try {
-     *                             System.out.println("The offset of the message we just sent is: " + send.offset());
-     *                         } catch(KafkaException e) {
+     *                     public void onCompletion(RecordMetadata metadata, Exception e) {
+     *                         if(e != null)
      *                             e.printStackTrace();
-     *                         }
+     *                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
      *                     }
      *                 });
      * </pre>
+     * 
+     * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
+     * following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
+     * 
+     * <pre>
+     * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
+     * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
+     * </pre>
      * <p>
-     * This call enqueues the message in the buffer of outgoing messages to be sent. This buffer has a hard limit on
-     * it's size controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called
-     * faster than the I/O thread can send data to the brokers we will eventually run out of buffer space. The default
-     * behavior in this case is to block the send call until the I/O thread catches up and more buffer space is
-     * available. However if non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will
-     * cause the producer to instead throw an exception when this occurs.
+     * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
+     * they will delay the sending of messages from other threads. If you want to execute blocking or computationally
+     * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
+     * to parallelize processing.
+     * <p>
+     * The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is
+     * controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called faster than the
+     * I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in
+     * this case is to block the send call until the I/O thread catches up and more buffer space is available. However
+     * in cases where non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will cause the
+     * producer to instead throw an exception when buffer memory is exhausted.
      * 
      * @param record The record to send
      * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
      *        indicates no callback)
-     * @throws BufferExhausedException This exception is thrown if the buffer is full and blocking has been disabled.
-     * @throws MessageTooLargeException This exception is thrown if the serialized size of the message is larger than
-     *         the maximum buffer memory or maximum request size that has been configured (whichever is smaller).
      */
     @Override
-    public RecordSend send(ProducerRecord record, Callback callback) {
-        Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
-        byte[] key = keySerializer.toBytes(record.key());
-        byte[] value = valueSerializer.toBytes(record.value());
-        byte[] partitionKey = keySerializer.toBytes(record.partitionKey());
-        int partition = partitioner.partition(record, key, partitionKey, value, cluster, cluster.partitionsFor(record.topic()).size());
-        ensureValidSize(key, value);
+    public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
         try {
+            Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
+            int partition = partitioner.partition(record, cluster);
+            ensureValidSize(record.key(), record.value());
             TopicPartition tp = new TopicPartition(record.topic(), partition);
-            RecordSend send = accumulator.append(tp, key, value, CompressionType.NONE, callback);
+            FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
             this.sender.wakeup();
-            return send;
-        } catch (InterruptedException e) {
-            throw new KafkaException(e);
+            return future;
+        } catch (Exception e) {
+            if (callback != null)
+                callback.onCompletion(null, e);
+            return new FutureFailure(e);
         }
     }
 
@@ -207,15 +214,19 @@ public class KafkaProducer implements Producer {
     private void ensureValidSize(byte[] key, byte[] value) {
         int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
         if (serializedSize > this.maxRequestSize)
-            throw new MessageTooLargeException("The message is " + serializedSize
-                                               + " bytes when serialized which is larger than the maximum request size you have configured with the "
-                                               + ProducerConfig.MAX_REQUEST_SIZE_CONFIG
-                                               + " configuration.");
+            throw new RecordTooLargeException("The message is " + serializedSize
+                                              + " bytes when serialized which is larger than the maximum request size you have configured with the "
+                                              + ProducerConfig.MAX_REQUEST_SIZE_CONFIG
+                                              + " configuration.");
         if (serializedSize > this.totalMemorySize)
-            throw new MessageTooLargeException("The message is " + serializedSize
-                                               + " bytes when serialized which is larger than the total memory buffer you have configured with the "
-                                               + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
-                                               + " configuration.");
+            throw new RecordTooLargeException("The message is " + serializedSize
+                                              + " bytes when serialized which is larger than the total memory buffer you have configured with the "
+                                              + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
+                                              + " configuration.");
+    }
+
+    public List<PartitionInfo> partitionsFor(String topic) {
+        return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsFor(topic);
     }
 
     @Override
@@ -237,4 +248,39 @@ public class KafkaProducer implements Producer {
         this.metrics.close();
     }
 
+    private static class FutureFailure implements Future<RecordMetadata> {
+
+        private final ExecutionException exception;
+
+        public FutureFailure(Exception exception) {
+            this.exception = new ExecutionException(exception);
+        }
+
+        @Override
+        public boolean cancel(boolean interrupt) {
+            return false;
+        }
+
+        @Override
+        public RecordMetadata get() throws ExecutionException {
+            throw this.exception;
+        }
+
+        @Override
+        public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+            throw this.exception;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return true;
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/MockProducer.java b/clients/src/main/java/kafka/clients/producer/MockProducer.java
index 2ea2030..ab83d5f 100644
--- a/clients/src/main/java/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/kafka/clients/producer/MockProducer.java
@@ -7,11 +7,14 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
+import kafka.clients.producer.internals.FutureRecordMetadata;
+import kafka.clients.producer.internals.Partitioner;
 import kafka.clients.producer.internals.ProduceRequestResult;
 import kafka.common.Cluster;
 import kafka.common.Metric;
-import kafka.common.Serializer;
+import kafka.common.PartitionInfo;
 import kafka.common.TopicPartition;
 
 /**
@@ -22,10 +25,8 @@ import kafka.common.TopicPartition;
  */
 public class MockProducer implements Producer {
 
-    private final Serializer keySerializer;
-    private final Serializer valueSerializer;
-    private final Partitioner partitioner;
     private final Cluster cluster;
+    private final Partitioner partitioner = new Partitioner();
     private final List<ProducerRecord> sent;
     private final Deque<Completion> completions;
     private boolean autoComplete;
@@ -34,21 +35,13 @@ public class MockProducer implements Producer {
     /**
      * Create a mock producer
      * 
-     * @param keySerializer A serializer to use on keys (useful to test your serializer on the values)
-     * @param valueSerializer A serializer to use on values (useful to test your serializer on the values)
-     * @param partitioner A partitioner to choose partitions (if null the partition will always be 0)
-     * @param cluster The cluster to pass to the partitioner (can be null if partitioner is null)
+     * @param cluster The cluster holding metadata for this producer
      * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
      *        the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
-     *        {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link RecordSend} that is
-     *        returned.
+     *        {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link
+     *        java.util.concurrent.Future Future&lt;RecordMetadata&gt;} that is returned.
      */
-    public MockProducer(Serializer keySerializer, Serializer valueSerializer, Partitioner partitioner, Cluster cluster, boolean autoComplete) {
-        if (partitioner != null && (cluster == null | keySerializer == null | valueSerializer == null))
-            throw new IllegalArgumentException("If a partitioner is provided a cluster instance and key and value serializer for partitioning must also be given.");
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
-        this.partitioner = partitioner;
+    public MockProducer(Cluster cluster, boolean autoComplete) {
         this.cluster = cluster;
         this.autoComplete = autoComplete;
         this.offsets = new HashMap<TopicPartition, Long>();
@@ -57,17 +50,16 @@ public class MockProducer implements Producer {
     }
 
     /**
-     * Create a new mock producer with no serializers or partitioner and the given autoComplete setting.
+     * Create a new mock producer with invented metadata the given autoComplete setting.
      * 
-     * Equivalent to {@link #MockProducer(Serializer, Serializer, Partitioner, Cluster, boolean) new MockProducer(null,
-     * null, null, null, autoComplete)}
+     * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)}
      */
     public MockProducer(boolean autoComplete) {
-        this(null, null, null, null, autoComplete);
+        this(Cluster.empty(), autoComplete);
     }
 
     /**
-     * Create a new auto completing mock producer with no serializers or partitioner.
+     * Create a new auto completing mock producer
      * 
      * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)}
      */
@@ -76,39 +68,36 @@ public class MockProducer implements Producer {
     }
 
     /**
-     * Adds the record to the list of sent records. The {@link RecordSend} returned will be immediately satisfied.
+     * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied.
      * 
      * @see #history()
      */
     @Override
-    public synchronized RecordSend send(ProducerRecord record) {
+    public synchronized Future<RecordMetadata> send(ProducerRecord record) {
         return send(record, null);
     }
 
     /**
-     * Adds the record to the list of sent records. The {@link RecordSend} returned will be immediately satisfied and
-     * the callback will be synchronously executed.
+     * Adds the record to the list of sent records.
      * 
      * @see #history()
      */
     @Override
-    public synchronized RecordSend send(ProducerRecord record, Callback callback) {
-        byte[] key = keySerializer == null ? null : keySerializer.toBytes(record.key());
-        byte[] partitionKey = keySerializer == null ? null : keySerializer.toBytes(record.partitionKey());
-        byte[] value = valueSerializer == null ? null : valueSerializer.toBytes(record.value());
-        int numPartitions = partitioner == null ? 0 : this.cluster.partitionsFor(record.topic()).size();
-        int partition = partitioner == null ? 0 : partitioner.partition(record, key, partitionKey, value, this.cluster, numPartitions);
+    public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
+        int partition = 0;
+        if (this.cluster.partitionsFor(record.topic()) != null)
+            partition = partitioner.partition(record, this.cluster);
         ProduceRequestResult result = new ProduceRequestResult();
-        RecordSend send = new RecordSend(0, result);
+        FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
         TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
         long offset = nextOffset(topicPartition);
-        Completion completion = new Completion(topicPartition, offset, send, result, callback);
+        Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, offset), result, callback);
         this.sent.add(record);
         if (autoComplete)
             completion.complete(null);
         else
             this.completions.addLast(completion);
-        return send;
+        return future;
     }
 
     /**
@@ -126,13 +115,14 @@ public class MockProducer implements Producer {
         }
     }
 
+    public List<PartitionInfo> partitionsFor(String topic) {
+        return this.cluster.partitionsFor(topic);
+    }
+
     public Map<String, Metric> metrics() {
         return Collections.emptyMap();
     }
 
-    /**
-     * "Closes" the producer
-     */
     @Override
     public void close() {
     }
@@ -178,13 +168,17 @@ public class MockProducer implements Producer {
 
     private static class Completion {
         private final long offset;
-        private final RecordSend send;
+        private final RecordMetadata metadata;
         private final ProduceRequestResult result;
         private final Callback callback;
         private final TopicPartition topicPartition;
 
-        public Completion(TopicPartition topicPartition, long offset, RecordSend send, ProduceRequestResult result, Callback callback) {
-            this.send = send;
+        public Completion(TopicPartition topicPartition,
+                          long offset,
+                          RecordMetadata metadata,
+                          ProduceRequestResult result,
+                          Callback callback) {
+            this.metadata = metadata;
             this.offset = offset;
             this.result = result;
             this.callback = callback;
@@ -193,8 +187,12 @@ public class MockProducer implements Producer {
 
         public void complete(RuntimeException e) {
             result.done(topicPartition, e == null ? offset : -1L, e);
-            if (callback != null)
-                callback.onCompletion(send);
+            if (callback != null) {
+                if (e == null)
+                    callback.onCompletion(metadata, null);
+                else
+                    callback.onCompletion(null, e);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Partitioner.java b/clients/src/main/java/kafka/clients/producer/Partitioner.java
deleted file mode 100644
index 1b8e51f..0000000
--- a/clients/src/main/java/kafka/clients/producer/Partitioner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package kafka.clients.producer;
-
-import kafka.common.Cluster;
-
-/**
- * An interface by which clients can override the default partitioning behavior that maps records to topic partitions.
- * <p>
- * A partitioner can use either the original java object the user provided or the serialized bytes.
- * <p>
- * It is expected that the partitioner will make use the key for partitioning, but there is no requirement that an
- * implementation do so. An implementation can use the key, the value, the state of the cluster, or any other side data.
- */
-public interface Partitioner {
-
-    /**
-     * Compute the partition for the given record. This partition number must be in the range [0...numPartitions). The
-     * cluster state provided is the most up-to-date view that the client has but leadership can change at any time so
-     * there is no guarantee that the node that is the leader for a particular partition at the time the partition
-     * function is called will still be the leader by the time the request is sent.
-     * 
-     * @param record The record being sent
-     * @param key The serialized bytes of the key (null if no key is given or the serialized form is null)
-     * @param value The serialized bytes of the value (null if no value is given or the serialized form is null)
-     * @param cluster The current state of the cluster
-     * @param numPartitions The total number of partitions for the given topic
-     * @return The partition to send this record to
-     */
-    public int partition(ProducerRecord record, byte[] key, byte[] partitionKey, byte[] value, Cluster cluster, int numPartitions);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Producer.java b/clients/src/main/java/kafka/clients/producer/Producer.java
index 6ba6633..f149f3a 100644
--- a/clients/src/main/java/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/kafka/clients/producer/Producer.java
@@ -1,8 +1,12 @@
 package kafka.clients.producer;
 
+import java.io.Closeable;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import kafka.common.Metric;
+import kafka.common.PartitionInfo;
 
 /**
  * The interface for the {@link KafkaProducer}
@@ -10,7 +14,7 @@ import kafka.common.Metric;
  * @see KafkaProducer
  * @see MockProducer
  */
-public interface Producer {
+public interface Producer extends Closeable {
 
     /**
      * Send the given record asynchronously and return a future which will eventually contain the response information.
@@ -18,12 +22,18 @@ public interface Producer {
      * @param record The record to send
      * @return A future which will eventually contain the response information
      */
-    public RecordSend send(ProducerRecord record);
+    public Future<RecordMetadata> send(ProducerRecord record);
 
     /**
-     * Send a message and invoke the given callback when the send is complete
+     * Send a record and invoke the given callback when the record has been acknowledged by the server
      */
-    public RecordSend send(ProducerRecord record, Callback callback);
+    public Future<RecordMetadata> send(ProducerRecord record, Callback callback);
+
+    /**
+     * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
+     * over time so this list should not be cached.
+     */
+    public List<PartitionInfo> partitionsFor(String topic);
 
     /**
      * Return a map of metrics maintained by the producer

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
index 9758293..a94afc7 100644
--- a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
@@ -25,19 +25,19 @@ public class ProducerConfig extends AbstractConfig {
     public static final String BROKER_LIST_CONFIG = "metadata.broker.list";
 
     /**
-     * The amount of time to block waiting to fetch metadata about a topic the first time a message is sent to that
+     * The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that
      * topic.
      */
     public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
 
     /**
-     * The buffer size allocated for a partition. When messages are received which are smaller than this size the
+     * The buffer size allocated for a partition. When records are received which are smaller than this size the
      * producer will attempt to optimistically group them together until this size is reached.
      */
     public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes";
 
     /**
-     * The total memory used by the producer to buffer messages waiting to be sent to the server. If messages are sent
+     * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent
      * faster than they can be delivered to the server the producer will either block or throw an exception based on the
      * preference specified by {@link #BLOCK_ON_BUFFER_FULL}.
      */
@@ -56,32 +56,17 @@ public class ProducerConfig extends AbstractConfig {
     public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms";
 
     /**
-     * The producer groups together any messages that arrive in between request sends. Normally this occurs only under
-     * load when messages arrive faster than they can be sent out. However the client can reduce the number of requests
-     * and increase throughput by adding a small amount of artificial delay to force more messages to batch together.
-     * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of messages
+     * The producer groups together any records that arrive in between request sends. Normally this occurs only under
+     * load when records arrive faster than they can be sent out. However the client can reduce the number of requests
+     * and increase throughput by adding a small amount of artificial delay to force more records to batch together.
+     * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of records
      * for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many
-     * bytes accumulated for this partition we will "linger" for the specified time waiting for more messages to show
-     * up. This setting defaults to 0.
+     * bytes accumulated for this partition we will "linger" for the specified time waiting for more records to show up.
+     * This setting defaults to 0.
      */
     public static final String LINGER_MS_CONFIG = "linger.ms";
 
     /**
-     * The fully qualified name of the {@link kafka.common.Serializer} class to use for serializing record values.
-     */
-    public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer.class";
-
-    /**
-     * The fully qualified name of the {@link kafka.common.Serializer} class to use for serializing record keys.
-     */
-    public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer.class";
-
-    /**
-     * The class to use for choosing a partition to send the message to
-     */
-    public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
-
-    /**
      * Force a refresh of the cluster metadata after this period of time. This ensures that changes to the number of
      * partitions or other settings will by taken up by producers without restart.
      */
@@ -99,8 +84,8 @@ public class ProducerConfig extends AbstractConfig {
     public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
 
     /**
-     * The maximum size of a request. This is also effectively a cap on the maximum message size. Note that the server
-     * has its own cap on message size which may be different from this.
+     * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server
+     * has its own cap on record size which may be different from this.
      */
     public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
 
@@ -111,9 +96,9 @@ public class ProducerConfig extends AbstractConfig {
     public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
 
     /**
-     * When our memory buffer is exhausted we must either stop accepting new messages (block) or throw errors. By
-     * default this setting is true and we block, however users who want to guarantee we never block can turn this into
-     * an error.
+     * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default
+     * this setting is true and we block, however users who want to guarantee we never block can turn this into an
+     * error.
      */
     public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full";
 
@@ -129,9 +114,6 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah")
                                 .define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah")
                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah")
-                                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "blah blah")
-                                .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "blah blah")
-                                .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, DefaultPartitioner.class.getName(), "blah blah")
                                 .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
                                 .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
                                 .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
index 5fddbef..b9c20bc 100644
--- a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
@@ -1,34 +1,34 @@
 package kafka.clients.producer;
 
 /**
- * An unserialized key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent,
- * a value (which can be null) which is the contents of the record and an optional key (which can also be null). In
- * cases the key used for choosing a partition is going to be different the user can specify a partition key which will
- * be used only for computing the partition to which this record will be sent and will not be retained with the record.
+ * A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional
+ * partition number, and an optional key and value.
+ * <p>
+ * If a valid partition number is specified that partition will be used when sending the record. If no partition is
+ * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
+ * present a partition will be assigned in a round-robin fashion.
  */
 public final class ProducerRecord {
 
     private final String topic;
-    private final Object key;
-    private final Object partitionKey;
-    private final Object value;
+    private final Integer partition;
+    private final byte[] key;
+    private final byte[] value;
 
     /**
-     * Creates a record to be sent to Kafka using a special override key for partitioning that is different form the key
-     * retained in the record
+     * Creates a record to be sent to a specified topic and partition
      * 
      * @param topic The topic the record will be appended to
+     * @param partition The partition to which the record should be sent
      * @param key The key that will be included in the record
-     * @param partitionKey An override for the key to be used only for partitioning purposes in the client. This key
-     *        will not be retained or available to downstream consumers.
      * @param value The record contents
      */
-    public ProducerRecord(String topic, Object key, Object partitionKey, Object value) {
+    public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) {
         if (topic == null)
             throw new IllegalArgumentException("Topic cannot be null");
         this.topic = topic;
+        this.partition = partition;
         this.key = key;
-        this.partitionKey = partitionKey;
         this.value = value;
     }
 
@@ -39,8 +39,8 @@ public final class ProducerRecord {
      * @param key The key that will be included in the record
      * @param value The record contents
      */
-    public ProducerRecord(String topic, Object key, Object value) {
-        this(topic, key, key, value);
+    public ProducerRecord(String topic, byte[] key, byte[] value) {
+        this(topic, null, key, value);
     }
 
     /**
@@ -49,7 +49,7 @@ public final class ProducerRecord {
      * @param topic The topic this record should be sent to
      * @param value The record contents
      */
-    public ProducerRecord(String topic, Object value) {
+    public ProducerRecord(String topic, byte[] value) {
         this(topic, null, value);
     }
 
@@ -63,22 +63,22 @@ public final class ProducerRecord {
     /**
      * The key (or null if no key is specified)
      */
-    public Object key() {
+    public byte[] key() {
         return key;
     }
 
     /**
-     * An override key to use instead of the main record key
+     * @return The value
      */
-    public Object partitionKey() {
-        return partitionKey;
+    public byte[] value() {
+        return value;
     }
 
     /**
-     * @return The value
+     * The partition to which the record will be sent (or null if no partition was specified)
      */
-    public Object value() {
-        return value;
+    public Integer partition() {
+        return partition;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
new file mode 100644
index 0000000..1486586
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
@@ -0,0 +1,39 @@
+package kafka.clients.producer;
+
+import kafka.common.TopicPartition;
+
+/**
+ * The metadata for a record that has been acknowledged by the server
+ */
+public final class RecordMetadata {
+
+    private final long offset;
+    private final TopicPartition topicPartition;
+
+    public RecordMetadata(TopicPartition topicPartition, long offset) {
+        super();
+        this.offset = offset;
+        this.topicPartition = topicPartition;
+    }
+
+    /**
+     * The offset of the record in the topic/partition.
+     */
+    public long offset() {
+        return this.offset;
+    }
+
+    /**
+     * The topic the record was appended to
+     */
+    public String topic() {
+        return this.topicPartition.topic();
+    }
+
+    /**
+     * The partition the record was sent to
+     */
+    public int partition() {
+        return this.topicPartition.partition();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/RecordSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/RecordSend.java b/clients/src/main/java/kafka/clients/producer/RecordSend.java
deleted file mode 100644
index 1883dab..0000000
--- a/clients/src/main/java/kafka/clients/producer/RecordSend.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package kafka.clients.producer;
-
-import java.util.concurrent.TimeUnit;
-
-import kafka.clients.producer.internals.ProduceRequestResult;
-import kafka.common.errors.ApiException;
-import kafka.common.errors.TimeoutException;
-
-/**
- * An asynchronously computed response from sending a record. Calling <code>await()</code> or most of the other accessor
- * methods will block until the response for this record is available. If you wish to avoid blocking provide a
- * {@link kafka.clients.producer.Callback Callback} with the record send.
- */
-public final class RecordSend {
-
-    private final long relativeOffset;
-    private final ProduceRequestResult result;
-
-    public RecordSend(long relativeOffset, ProduceRequestResult result) {
-        this.relativeOffset = relativeOffset;
-        this.result = result;
-    }
-
-    /**
-     * Block until this send has completed successfully. If the request fails, throw the error that occurred in sending
-     * the request.
-     * @return the same object for chaining of calls
-     * @throws TimeoutException if the thread is interrupted while waiting
-     * @throws ApiException if the request failed.
-     */
-    public RecordSend await() {
-        result.await();
-        if (result.error() != null)
-            throw result.error();
-        return this;
-    }
-
-    /**
-     * Block until this send is complete or the given timeout elapses
-     * @param timeout the time to wait
-     * @param unit the units of the time given
-     * @return the same object for chaining
-     * @throws TimeoutException if the request isn't satisfied in the time period given or the thread is interrupted
-     *         while waiting
-     * @throws ApiException if the request failed.
-     */
-    public RecordSend await(long timeout, TimeUnit unit) {
-        boolean success = result.await(timeout, unit);
-        if (!success)
-            throw new TimeoutException("Request did not complete after " + timeout + " " + unit);
-        if (result.error() != null)
-            throw result.error();
-        return this;
-    }
-
-    /**
-     * Get the offset for the given message. This method will block until the request is complete and will throw an
-     * exception if the request fails.
-     * @return The offset
-     */
-    public long offset() {
-        await();
-        return this.result.baseOffset() + this.relativeOffset;
-    }
-
-    /**
-     * Check if the request is complete without blocking
-     */
-    public boolean completed() {
-        return this.result.completed();
-    }
-
-    /**
-     * Block on request completion and return true if there was an error.
-     */
-    public boolean hasError() {
-        result.await();
-        return this.result.error() != null;
-    }
-
-    /**
-     * Return the error thrown
-     */
-    public Exception error() {
-        result.await();
-        return this.result.error();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
new file mode 100644
index 0000000..43b4c5d
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -0,0 +1,63 @@
+package kafka.clients.producer.internals;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import kafka.clients.producer.RecordMetadata;
+
+/**
+ * The future result of a record send
+ */
+public final class FutureRecordMetadata implements Future<RecordMetadata> {
+
+    private final ProduceRequestResult result;
+    private final long relativeOffset;
+
+    public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) {
+        this.result = result;
+        this.relativeOffset = relativeOffset;
+    }
+
+    @Override
+    public boolean cancel(boolean interrupt) {
+        return false;
+    }
+
+    @Override
+    public RecordMetadata get() throws InterruptedException, ExecutionException {
+        this.result.await();
+        return valueOrError();
+    }
+
+    @Override
+    public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        boolean occurred = this.result.await(timeout, unit);
+        if (!occurred)
+            throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
+        return valueOrError();
+    }
+
+    private RecordMetadata valueOrError() throws ExecutionException {
+        if (this.result.error() != null)
+            throw new ExecutionException(this.result.error());
+        else
+            return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset);
+    }
+
+    public long relativeOffset() {
+        return this.relativeOffset;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        return this.result.completed();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
new file mode 100644
index 0000000..6d2188e
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
@@ -0,0 +1,55 @@
+package kafka.clients.producer.internals;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import kafka.clients.producer.ProducerRecord;
+import kafka.common.Cluster;
+import kafka.common.PartitionInfo;
+import kafka.common.utils.Utils;
+
+/**
+ * The default partitioning strategy:
+ * <ul>
+ * <li>If a partition is specified in the record, use it
+ * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
+ * <li>If no partition or key is present choose a partition in a round-robin fashion
+ */
+public class Partitioner {
+
+    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
+
+    /**
+     * Compute the partition for the given record.
+     * 
+     * @param record The record being sent
+     * @param numPartitions The total number of partitions for the given topic
+     */
+    public int partition(ProducerRecord record, Cluster cluster) {
+        List<PartitionInfo> partitions = cluster.partitionsFor(record.topic());
+        int numPartitions = partitions.size();
+        if (record.partition() != null) {
+            // they have given us a partition, use it
+            if (record.partition() < 0 || record.partition() >= numPartitions)
+                throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
+                                                   + " is not in the range [0..."
+                                                   + numPartitions
+                                                   + "].");
+            return record.partition();
+        } else if (record.key() == null) {
+            // choose the next available node in a round-robin fashion
+            for (int i = 0; i < numPartitions; i++) {
+                int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
+                if (partitions.get(partition).leader() != null)
+                    return partition;
+            }
+            // no partitions are available, give a non-available partition
+            return Utils.abs(counter.getAndIncrement()) % numPartitions;
+        } else {
+            // hash the key to choose a partition
+            return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
index 1049b61..cdae00a 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -3,14 +3,13 @@ package kafka.clients.producer.internals;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import kafka.clients.producer.RecordSend;
+import kafka.clients.producer.RecordMetadata;
 import kafka.common.TopicPartition;
-import kafka.common.errors.TimeoutException;
 
 /**
  * A class that models the future completion of a produce request for a single partition. There is one of these per
- * partition in a produce request and it is shared by all the {@link RecordSend} instances that are batched together for
- * the same partition in the request.
+ * partition in a produce request and it is shared by all the {@link RecordMetadata} instances that are batched together
+ * for the same partition in the request.
  */
 public final class ProduceRequestResult {
 
@@ -25,7 +24,7 @@ public final class ProduceRequestResult {
     /**
      * Mark this request as complete and unblock any threads waiting on its completion.
      * @param topicPartition The topic and partition to which this record set was sent was sent
-     * @param baseOffset The base offset assigned to the message
+     * @param baseOffset The base offset assigned to the record
      * @param error The error that occurred if there was one, or null.
      */
     public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
@@ -38,12 +37,8 @@ public final class ProduceRequestResult {
     /**
      * Await the completion of this request
      */
-    public void await() {
-        try {
-            latch.await();
-        } catch (InterruptedException e) {
-            throw new TimeoutException("Interrupted while waiting for request to complete.");
-        }
+    public void await() throws InterruptedException {
+        latch.await();
     }
 
     /**
@@ -52,16 +47,12 @@ public final class ProduceRequestResult {
      * @param unit The unit for the max time
      * @return true if the request completed, false if we timed out
      */
-    public boolean await(long timeout, TimeUnit unit) {
-        try {
-            return latch.await(timeout, unit);
-        } catch (InterruptedException e) {
-            throw new TimeoutException("Interrupted while waiting for request to complete.");
-        }
+    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+        return latch.await(timeout, unit);
     }
 
     /**
-     * The base offset for the request (the first offset in the message set)
+     * The base offset for the request (the first offset in the record set)
      */
     public long baseOffset() {
         return baseOffset;
@@ -75,6 +66,13 @@ public final class ProduceRequestResult {
     }
 
     /**
+     * The topic and partition to which the record was appended
+     */
+    public TopicPartition topicPartition() {
+        return topicPartition;
+    }
+
+    /**
      * Has the request completed?
      */
     public boolean completed() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
index a2b536c..c22939f 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
@@ -11,7 +11,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
 import kafka.clients.producer.Callback;
-import kafka.clients.producer.RecordSend;
 import kafka.common.TopicPartition;
 import kafka.common.metrics.Measurable;
 import kafka.common.metrics.MetricConfig;
@@ -67,21 +66,21 @@ public final class RecordAccumulator {
 
     private void registerMetrics(Metrics metrics) {
         metrics.addMetric("blocked_threads",
-                          "The number of user threads blocked waiting for buffer memory to enqueue their messages",
+                          "The number of user threads blocked waiting for buffer memory to enqueue their records",
                           new Measurable() {
                               public double measure(MetricConfig config, long now) {
                                   return free.queued();
                               }
                           });
         metrics.addMetric("buffer_total_bytes",
-                          "The total amount of buffer memory that is available (not currently used for buffering messages).",
+                          "The total amount of buffer memory that is available (not currently used for buffering records).",
                           new Measurable() {
                               public double measure(MetricConfig config, long now) {
                                   return free.totalMemory();
                               }
                           });
         metrics.addMetric("buffer_available_bytes",
-                          "The total amount of buffer memory that is available (not currently used for buffering messages).",
+                          "The total amount of buffer memory that is available (not currently used for buffering records).",
                           new Measurable() {
                               public double measure(MetricConfig config, long now) {
                                   return free.availableMemory();
@@ -100,7 +99,7 @@ public final class RecordAccumulator {
      * @param compression The compression codec for the record
      * @param callback The user-supplied callback to execute when the request is complete
      */
-    public RecordSend append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
+    public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
         if (closed)
             throw new IllegalStateException("Cannot send after the producer is closed.");
         // check if we have an in-progress batch
@@ -108,9 +107,9 @@ public final class RecordAccumulator {
         synchronized (dq) {
             RecordBatch batch = dq.peekLast();
             if (batch != null) {
-                RecordSend send = batch.tryAppend(key, value, compression, callback);
-                if (send != null)
-                    return send;
+                FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback);
+                if (future != null)
+                    return future;
             }
         }
 
@@ -120,19 +119,18 @@ public final class RecordAccumulator {
         synchronized (dq) {
             RecordBatch first = dq.peekLast();
             if (first != null) {
-                RecordSend send = first.tryAppend(key, value, compression, callback);
-                if (send != null) {
-                    // somebody else found us a batch, return the one we waited for!
-                    // Hopefully this doesn't happen
+                FutureRecordMetadata future = first.tryAppend(key, value, compression, callback);
+                if (future != null) {
+                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
                     // often...
                     free.deallocate(buffer);
-                    return send;
+                    return future;
                 }
             }
             RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds());
-            RecordSend send = Utils.notNull(batch.tryAppend(key, value, compression, callback));
+            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback));
             dq.addLast(batch);
-            return send;
+            return future;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
index 4a536a2..633f4af 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
@@ -4,7 +4,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import kafka.clients.producer.Callback;
-import kafka.clients.producer.RecordSend;
+import kafka.clients.producer.RecordMetadata;
 import kafka.common.TopicPartition;
 import kafka.common.record.CompressionType;
 import kafka.common.record.MemoryRecords;
@@ -31,19 +31,20 @@ public final class RecordBatch {
     }
 
     /**
-     * Append the message to the current message set and return the relative offset within that message set
+     * Append the record to the current record set and return the relative offset within that record set
      * 
-     * @return The RecordSend corresponding to this message or null if there isn't sufficient room.
+     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
      */
-    public RecordSend tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
+    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
         if (!this.records.hasRoomFor(key, value)) {
             return null;
         } else {
             this.records.append(0L, key, value, compression);
-            RecordSend send = new RecordSend(this.recordCount++, this.produceFuture);
+            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
             if (callback != null)
-                thunks.add(new Thunk(callback, send));
-            return send;
+                thunks.add(new Thunk(callback, this.recordCount));
+            this.recordCount++;
+            return future;
         }
     }
 
@@ -58,7 +59,12 @@ public final class RecordBatch {
         // execute callbacks
         for (int i = 0; i < this.thunks.size(); i++) {
             try {
-                this.thunks.get(i).execute();
+                Thunk thunk = this.thunks.get(i);
+                if (exception == null)
+                    thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset),
+                                                null);
+                else
+                    thunk.callback.onCompletion(null, exception);
             } catch (Exception e) {
                 e.printStackTrace();
             }
@@ -70,15 +76,11 @@ public final class RecordBatch {
      */
     final private static class Thunk {
         final Callback callback;
-        final RecordSend send;
+        final long relativeOffset;
 
-        public Thunk(Callback callback, RecordSend send) {
+        public Thunk(Callback callback, long relativeOffset) {
             this.callback = callback;
-            this.send = send;
-        }
-
-        public void execute() {
-            this.callback.onCompletion(this.send);
+            this.relativeOffset = relativeOffset;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/kafka/clients/producer/internals/Sender.java
index effeb9c..5ac487b 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/Sender.java
@@ -283,7 +283,8 @@ public class Sender implements Runnable {
 
     private void handleMetadataResponse(Struct body, long now) {
         this.metadataFetchInProgress = false;
-        this.metadata.update(ProtoUtils.parseMetadataResponse(body), now);
+        Cluster cluster = ProtoUtils.parseMetadataResponse(body);
+        this.metadata.update(cluster, now);
     }
 
     /**
@@ -377,7 +378,7 @@ public class Sender implements Runnable {
                 buffer.flip();
                 Struct part = topicData.instance("data")
                                        .set("partition", parts.get(i).topicPartition.partition())
-                                       .set("message_set", buffer);
+                                       .set("record_set", buffer);
                 partitionData[i] = part;
             }
             topicData.set("data", partitionData);

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
index 7331b73..973eb5e 100644
--- a/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
@@ -7,47 +7,42 @@ import kafka.clients.producer.Callback;
 import kafka.clients.producer.KafkaProducer;
 import kafka.clients.producer.ProducerConfig;
 import kafka.clients.producer.ProducerRecord;
-import kafka.clients.producer.RecordSend;
-import kafka.common.ByteSerialization;
+import kafka.clients.producer.RecordMetadata;
+import kafka.common.record.Records;
 
 public class ProducerPerformance {
 
     public static void main(String[] args) throws Exception {
         if (args.length != 3) {
-            System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_messages message_size");
+            System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_records record_size");
             System.exit(1);
         }
         String url = args[0];
-        int numMessages = Integer.parseInt(args[1]);
-        int messageSize = Integer.parseInt(args[2]);
+        int numRecords = Integer.parseInt(args[1]);
+        int recordSize = Integer.parseInt(args[2]);
         Properties props = new Properties();
         props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1");
         props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
         props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
         props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
-        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteSerialization.class.getName());
-        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteSerialization.class.getName());
 
         KafkaProducer producer = new KafkaProducer(props);
         Callback callback = new Callback() {
-            public void onCompletion(RecordSend send) {
-                try {
-                    send.offset();
-                } catch (Exception e) {
+            public void onCompletion(RecordMetadata metadata, Exception e) {
+                if (e != null)
                     e.printStackTrace();
-                }
             }
         };
-        byte[] payload = new byte[messageSize];
+        byte[] payload = new byte[recordSize];
         Arrays.fill(payload, (byte) 1);
         ProducerRecord record = new ProducerRecord("test", payload);
         long start = System.currentTimeMillis();
         long maxLatency = -1L;
         long totalLatency = 0;
         int reportingInterval = 1000000;
-        for (int i = 0; i < numMessages; i++) {
+        for (int i = 0; i < numRecords; i++) {
             long sendStart = System.currentTimeMillis();
-            producer.send(record, null);
+            producer.send(record, callback);
             long sendEllapsed = System.currentTimeMillis() - sendStart;
             maxLatency = Math.max(maxLatency, sendEllapsed);
             totalLatency += sendEllapsed;
@@ -61,9 +56,9 @@ public class ProducerPerformance {
             }
         }
         long ellapsed = System.currentTimeMillis() - start;
-        double msgsSec = 1000.0 * numMessages / (double) ellapsed;
-        double mbSec = msgsSec * messageSize / (1024.0 * 1024.0);
-        System.out.printf("%d messages sent in %d ms ms. %.2f messages per second (%.2f mb/sec).", numMessages, ellapsed, msgsSec, mbSec);
+        double msgsSec = 1000.0 * numRecords / (double) ellapsed;
+        double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
+        System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).", numRecords, ellapsed, msgsSec, mbSec);
         producer.close();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/ByteSerialization.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/ByteSerialization.java b/clients/src/main/java/kafka/common/ByteSerialization.java
deleted file mode 100644
index eca69f1..0000000
--- a/clients/src/main/java/kafka/common/ByteSerialization.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package kafka.common;
-
-/**
- * A serialization implementation that just retains the provided byte array unchanged
- */
-public class ByteSerialization implements Serializer, Deserializer {
-
-    @Override
-    public Object fromBytes(byte[] bytes) {
-        return bytes;
-    }
-
-    @Override
-    public byte[] toBytes(Object o) {
-        return (byte[]) o;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Cluster.java b/clients/src/main/java/kafka/common/Cluster.java
index d0acd8d..8d045d5 100644
--- a/clients/src/main/java/kafka/common/Cluster.java
+++ b/clients/src/main/java/kafka/common/Cluster.java
@@ -12,13 +12,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import kafka.common.utils.Utils;
 
 /**
- * A representation of the nodes, topics, and partitions in the Kafka cluster
+ * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
  */
 public final class Cluster {
 
     private final AtomicInteger counter = new AtomicInteger(0);
     private final List<Node> nodes;
-    private final Map<Integer, Node> nodesById;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
     private final Map<String, List<PartitionInfo>> partitionsByTopic;
 
@@ -28,22 +27,28 @@ public final class Cluster {
      * @param partitions Information about a subset of the topic-partitions this cluster hosts
      */
     public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
-        this.nodes = new ArrayList<Node>(nodes);
-        this.nodesById = new HashMap<Integer, Node>(this.nodes.size());
-        this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
-        this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partitions.size());
+        // make a randomized, unmodifiable copy of the nodes
+        List<Node> copy = new ArrayList<Node>(nodes);
+        Collections.shuffle(copy);
+        this.nodes = Collections.unmodifiableList(copy);
 
-        Collections.shuffle(this.nodes);
-        for (Node n : this.nodes)
-            this.nodesById.put(n.id(), n);
+        // index the partitions by topic/partition for quick lookup
+        this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
         for (PartitionInfo p : partitions)
             this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
+
+        // index the partitions by topic and make the lists unmodifiable so we can handle them out in
+        // user-facing apis without risk of the client modifying the contents
+        HashMap<String, List<PartitionInfo>> parts = new HashMap<String, List<PartitionInfo>>();
         for (PartitionInfo p : partitions) {
-            if (!this.partitionsByTopic.containsKey(p.topic()))
-                this.partitionsByTopic.put(p.topic(), new ArrayList<PartitionInfo>());
-            List<PartitionInfo> ps = this.partitionsByTopic.get(p.topic());
+            if (!parts.containsKey(p.topic()))
+                parts.put(p.topic(), new ArrayList<PartitionInfo>());
+            List<PartitionInfo> ps = parts.get(p.topic());
             ps.add(p);
         }
+        this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(parts.size());
+        for (Map.Entry<String, List<PartitionInfo>> entry : parts.entrySet())
+            this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
     }
 
     /**
@@ -67,6 +72,13 @@ public final class Cluster {
     }
 
     /**
+     * @return The known set of nodes
+     */
+    public List<Node> nodes() {
+        return this.nodes;
+    }
+
+    /**
      * Get the current leader for the given topic-partition
      * @param topicPartition The topic and partition we want to know the leader for
      * @return The node that is the leader for this topic-partition, or null if there is currently no leader
@@ -76,7 +88,16 @@ public final class Cluster {
         if (info == null)
             return null;
         else
-            return nodesById.get(info.leader());
+            return info.leader();
+    }
+
+    /**
+     * Get the metadata for the specified partition
+     * @param topicPartition The topic and partition to fetch info for
+     * @return The metadata about the given topic and partition
+     */
+    public PartitionInfo partition(TopicPartition topicPartition) {
+        return partitionsByTopicPartition.get(topicPartition);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Deserializer.java b/clients/src/main/java/kafka/common/Deserializer.java
deleted file mode 100644
index ad2e784..0000000
--- a/clients/src/main/java/kafka/common/Deserializer.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package kafka.common;
-
-/**
- * A class that controls how an object is turned into bytes. Classes implementing this interface will generally be
- * instantiated by the framework.
- * <p>
- * An implementation that requires special configuration parameters can implement {@link Configurable}
- */
-public interface Deserializer {
-
-    /**
-     * Map a byte[] to an object
-     * @param bytes The bytes for the object (can be null)
-     * @return The deserialized object (can return null)
-     */
-    public Object fromBytes(byte[] bytes);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/PartitionInfo.java b/clients/src/main/java/kafka/common/PartitionInfo.java
index f3f08dd..0e50ed7 100644
--- a/clients/src/main/java/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/kafka/common/PartitionInfo.java
@@ -7,11 +7,11 @@ public class PartitionInfo {
 
     private final String topic;
     private final int partition;
-    private final int leader;
-    private final int[] replicas;
-    private final int[] inSyncReplicas;
+    private final Node leader;
+    private final Node[] replicas;
+    private final Node[] inSyncReplicas;
 
-    public PartitionInfo(String topic, int partition, int leader, int[] replicas, int[] inSyncReplicas) {
+    public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
         this.topic = topic;
         this.partition = partition;
         this.leader = leader;
@@ -36,14 +36,14 @@ public class PartitionInfo {
     /**
      * The node id of the node currently acting as a leader for this partition or -1 if there is no leader
      */
-    public int leader() {
+    public Node leader() {
         return leader;
     }
 
     /**
      * The complete set of replicas for this partition regardless of whether they are alive or up-to-date
      */
-    public int[] replicas() {
+    public Node[] replicas() {
         return replicas;
     }
 
@@ -51,7 +51,7 @@ public class PartitionInfo {
      * The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
      * the leader should fail
      */
-    public int[] inSyncReplicas() {
+    public Node[] inSyncReplicas() {
         return inSyncReplicas;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Serializer.java b/clients/src/main/java/kafka/common/Serializer.java
deleted file mode 100644
index 63353d8..0000000
--- a/clients/src/main/java/kafka/common/Serializer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package kafka.common;
-
-/**
- * A class that controls how an object is turned into bytes. Classes implementing this interface will generally be
- * instantiated by the framework.
- * <p>
- * An implementation should handle null inputs.
- * <p>
- * An implementation that requires special configuration parameters can implement {@link Configurable}
- */
-public interface Serializer {
-
-    /**
-     * Translate an object into bytes. The serializer must handle null inputs, and will generally just return null in
-     * this case.
-     * @param o The object to serialize, can be null
-     * @return The serialized bytes for the object or null
-     */
-    public byte[] toBytes(Object o);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/StringSerialization.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/StringSerialization.java b/clients/src/main/java/kafka/common/StringSerialization.java
deleted file mode 100644
index c0ed5ca..0000000
--- a/clients/src/main/java/kafka/common/StringSerialization.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package kafka.common;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Map;
-
-/**
- * A serializer and deserializer for strings.
- * <p>
- * This class accepts a configuration parameter string.encoding which can take the string name of any supported
- * encoding. If no encoding is specified the default will be UTF-8.
- */
-public class StringSerialization implements Serializer, Deserializer, Configurable {
-
-    private final static String ENCODING_CONFIG = "string.encoding";
-
-    private String encoding;
-
-    public StringSerialization(String encoding) {
-        super();
-        this.encoding = encoding;
-    }
-
-    public StringSerialization() {
-        this("UTF8");
-    }
-
-    public void configure(Map<String, ?> configs) {
-        if (configs.containsKey(ENCODING_CONFIG))
-            this.encoding = (String) configs.get(ENCODING_CONFIG);
-    }
-
-    @Override
-    public Object fromBytes(byte[] bytes) {
-        if (bytes == null) {
-            return null;
-        } else {
-            try {
-                return new String(bytes, encoding);
-            } catch (UnsupportedEncodingException e) {
-                throw new KafkaException(e);
-            }
-        }
-    }
-
-    @Override
-    public byte[] toBytes(Object o) {
-        if (o == null) {
-            return null;
-        } else {
-            try {
-                return ((String) o).getBytes(encoding);
-            } catch (UnsupportedEncodingException e) {
-                throw new KafkaException(e);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/CorruptMessageException.java b/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
deleted file mode 100644
index faf6234..0000000
--- a/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class CorruptMessageException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public CorruptMessageException() {
-        super("This message has failed it's CRC checksum or is otherwise corrupt.");
-    }
-
-    public CorruptMessageException(String message) {
-        super(message);
-    }
-
-    public CorruptMessageException(Throwable cause) {
-        super(cause);
-    }
-
-    public CorruptMessageException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
new file mode 100644
index 0000000..492f2e3
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class CorruptRecordException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public CorruptRecordException() {
+        super("This message has failed it's CRC checksum or is otherwise corrupt.");
+    }
+
+    public CorruptRecordException(String message) {
+        super(message);
+    }
+
+    public CorruptRecordException(Throwable cause) {
+        super(cause);
+    }
+
+    public CorruptRecordException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java b/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
deleted file mode 100644
index 7417906..0000000
--- a/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class MessageTooLargeException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public MessageTooLargeException() {
-        super();
-    }
-
-    public MessageTooLargeException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public MessageTooLargeException(String message) {
-        super(message);
-    }
-
-    public MessageTooLargeException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
new file mode 100644
index 0000000..bef4293
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class RecordTooLargeException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public RecordTooLargeException() {
+        super();
+    }
+
+    public RecordTooLargeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RecordTooLargeException(String message) {
+        super(message);
+    }
+
+    public RecordTooLargeException(Throwable cause) {
+        super(cause);
+    }
+
+}