You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/05/28 22:46:47 UTC

[1/2] kafka git commit: KAFKA-2186; Follow-up to KAFKA-1650 - add selective offset commit to consumer connector API; reviewed by Joel Koshy

Repository: kafka
Updated Branches:
  refs/heads/trunk c60f32501 -> d6c45c70f


KAFKA-2186; Follow-up to KAFKA-1650 - add selective offset commit to consumer connector API; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05e8a781
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05e8a781
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05e8a781

Branch: refs/heads/trunk
Commit: 05e8a78145fc3db673bb46be8ba1788558c5a26d
Parents: c60f325
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu May 28 13:22:30 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu May 28 13:22:30 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/javaapi/consumer/ConsumerConnector.java  | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/05e8a781/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index cc3400f..ca74ca8 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -20,6 +20,9 @@ package kafka.javaapi.consumer;
 
 import java.util.List;
 import java.util.Map;
+
+import kafka.common.OffsetAndMetadata;
+import kafka.common.TopicAndPartition;
 import kafka.consumer.KafkaStream;
 import kafka.consumer.TopicFilter;
 import kafka.serializer.Decoder;
@@ -65,6 +68,14 @@ public interface ConsumerConnector {
   public void commitOffsets(boolean retryOnFailure);
 
   /**
+   *  Commit offsets using the provided offsets map
+   *
+   *  @param offsetsToCommit a map containing the offset to commit for each partition.
+   *  @param retryOnFailure enable retries on the offset commit if it fails.
+   */
+  public void commitOffsets(Map<TopicAndPartition, OffsetAndMetadata> offsetsToCommit, boolean retryOnFailure);
+
+  /**
    *  Shut down the connector
    */
   public void shutdown();


[2/2] kafka git commit: KAFKA-2091; Expose a partitioner interface in the new producer (https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer); reviewed by Joel Koshy and Jay Kreps

Posted by jj...@apache.org.
KAFKA-2091; Expose a partitioner interface in the new producer
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer);
reviewed by Joel Koshy and Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d6c45c70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d6c45c70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d6c45c70

Branch: refs/heads/trunk
Commit: d6c45c70fb9773043766446e88370db9709e7995
Parents: 05e8a78
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Thu May 28 13:27:05 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu May 28 13:27:12 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   | 82 +++++++++++-------
 .../kafka/clients/producer/MockProducer.java    | 40 ++++++---
 .../kafka/clients/producer/Partitioner.java     | 46 ++++++++++
 .../kafka/clients/producer/ProducerConfig.java  | 14 ++-
 .../producer/internals/DefaultPartitioner.java  | 89 ++++++++++++++++++++
 .../clients/producer/internals/Partitioner.java | 89 --------------------
 .../internals/DefaultPartitionerTest.java       | 63 ++++++++++++++
 .../producer/internals/PartitionerTest.java     | 68 ---------------
 8 files changed, 289 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 8e336a3..ded19d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.producer.internals.Partitioner;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.common.Cluster;
@@ -73,11 +72,11 @@ import org.slf4j.LoggerFactory;
  * props.put("buffer.memory", 33554432);
  * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- * 
+ *
  * Producer<String, String> producer = new KafkaProducer(props);
  * for(int i = 0; i < 100; i++)
  *     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
- * 
+ *
  * producer.close();
  * }</pre>
  * <p>
@@ -92,25 +91,25 @@ import org.slf4j.LoggerFactory;
  * we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
  * <p>
  * If the request fails, the producer can automatically retry, though since we have specified <code>retries</code>
- * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on 
+ * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on
  * <a href="http://kafka.apache.org/documentation.html#semantics">message delivery semantics</a> for details).
  * <p>
- * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by 
+ * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by
  * the <code>batch.size</code> config. Making this larger can result in more batching, but requires more memory (since we will
  * generally have one of these buffers for each active partition).
  * <p>
- * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you 
+ * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you
  * want to reduce the number of requests you can set <code>linger.ms</code> to something greater than 0. This will
- * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will 
- * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, 
- * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting 
- * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that 
- * records that arrive close together in time will generally batch together even with <code>linger.ms=0</code> so under heavy load 
+ * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will
+ * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above,
+ * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting
+ * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that
+ * records that arrive close together in time will generally batch together even with <code>linger.ms=0</code> so under heavy load
  * batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more
  * efficient requests when not under maximal load at the cost of a small amount of latency.
  * <p>
  * The <code>buffer.memory</code> controls the total amount of memory available to the producer for buffering. If records
- * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is 
+ * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is
  * exhausted additional send calls will block. For uses where you want to avoid any blocking you can set <code>block.on.buffer.full=false</code> which
  * will cause the send call to result in an exception.
  * <p>
@@ -207,7 +206,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     MetricsReporter.class);
             reporters.add(new JmxReporter(jmxPrefix));
             this.metrics = new Metrics(metricConfig, reporters, time);
-            this.partitioner = new Partitioner();
+            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
@@ -285,7 +284,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     }
 
     /**
-     * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. 
+     * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
      * See {@link #send(ProducerRecord, Callback)} for details.
      */
     @Override
@@ -309,7 +308,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * or throw any exception that occurred while sending the record.
      * <p>
      * If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately:
-     * 
+     *
      * <pre>
      * {@code
      * byte[] key = "key".getBytes();
@@ -320,7 +319,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * <p>
      * Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
      * will be invoked when the request is complete.
-     * 
+     *
      * <pre>
      * {@code
      * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
@@ -334,10 +333,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *               });
      * }
      * </pre>
-     * 
+     *
      * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
      * following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
-     * 
+     *
      * <pre>
      * {@code
      * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
@@ -349,15 +348,15 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * they will delay the sending of messages from other threads. If you want to execute blocking or computationally
      * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
      * to parallelize processing.
-     * 
+     *
      * @param record The record to send
      * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
      *        indicates no callback)
-     *        
+     *
      * @throws InterruptException If the thread is interrupted while blocked
      * @throws SerializationException If the key or value are not valid objects given the configured serializers
      * @throws BufferExhaustedException If <code>block.on.buffer.full=false</code> and the buffer is full.
-     * 
+     *
      */
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
@@ -380,7 +379,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                         " specified in value.serializer");
             }
-            int partition = partitioner.partition(record.topic(), serializedKey, record.partition(), metadata.fetch());
+            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
@@ -452,12 +451,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                                               ProducerConfig.BUFFER_MEMORY_CONFIG +
                                               " configuration.");
     }
-    
+
     /**
-     * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is 
+     * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
      * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
-     * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>). 
-     * A request is considered completed when it is successfully acknowledged 
+     * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
+     * A request is considered completed when it is successfully acknowledged
      * according to the <code>acks</code> configuration you have specified or else it results in an error.
      * <p>
      * Other threads can continue sending records while one thread is blocked waiting for a flush call to complete,
@@ -475,10 +474,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * consumer.commit();
      * }
      * </pre>
-     * 
+     *
      * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur
      * we need to set <code>retries=&lt;large_number&gt;</code> in our config.
-     * 
+     *
      * @throws InterruptException If the thread is interrupted while blocked
      */
     @Override
@@ -550,7 +549,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public void close(long timeout, TimeUnit timeUnit) {
         close(timeout, timeUnit, false);
     }
-    
+
     private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {
         if (timeout < 0)
             throw new IllegalArgumentException("The timeout cannot be negative.");
@@ -600,6 +599,27 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             throw new KafkaException("Failed to close kafka producer", firstException.get());
     }
 
+    /**
+     * computes partition for given record.
+     * if the record has partition returns the value otherwise
+     * calls configured partitioner class to compute the partition.
+     */
+    private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
+        Integer partition = record.partition();
+        if (partition != null) {
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
+            int numPartitions = partitions.size();
+            // they have given us a partition, use it
+            if (partition < 0 || partition >= numPartitions)
+                throw new IllegalArgumentException("Invalid partition given with record: " + partition
+                                                   + " is not in the range [0..."
+                                                   + numPartitions
+                                                   + "].");
+            return partition;
+        }
+        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
+    }
+
     private static class FutureFailure implements Future<RecordMetadata> {
 
         private final ExecutionException exception;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 3c34610..e66491c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -27,7 +27,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
-import org.apache.kafka.clients.producer.internals.Partitioner;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
 import org.apache.kafka.common.*;
 
@@ -41,7 +41,7 @@ import org.apache.kafka.common.*;
 public class MockProducer implements Producer<byte[], byte[]> {
 
     private final Cluster cluster;
-    private final Partitioner partitioner = new Partitioner();
+    private final Partitioner partitioner = new DefaultPartitioner();
     private final List<ProducerRecord<byte[], byte[]>> sent;
     private final Deque<Completion> completions;
     private boolean autoComplete;
@@ -49,7 +49,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
 
     /**
      * Create a mock producer
-     * 
+     *
      * @param cluster The cluster holding metadata for this producer
      * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
      *        the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
@@ -66,7 +66,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
 
     /**
      * Create a new mock producer with invented metadata the given autoComplete setting.
-     * 
+     *
      * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)}
      */
     public MockProducer(boolean autoComplete) {
@@ -75,7 +75,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
 
     /**
      * Create a new auto completing mock producer
-     * 
+     *
      * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)}
      */
     public MockProducer() {
@@ -94,14 +94,14 @@ public class MockProducer implements Producer<byte[], byte[]> {
 
     /**
      * Adds the record to the list of sent records.
-     * 
+     *
      * @see #history()
      */
     @Override
     public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
         int partition = 0;
         if (this.cluster.partitionsForTopic(record.topic()) != null)
-            partition = partitioner.partition(record.topic(), record.key(), record.partition(), this.cluster);
+            partition = partition(record, this.cluster);
         ProduceRequestResult result = new ProduceRequestResult();
         FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
         TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
@@ -129,7 +129,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
             return offset;
         }
     }
-    
+
     public synchronized void flush() {
         while (!this.completions.isEmpty())
             completeNext();
@@ -168,7 +168,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
 
     /**
      * Complete the earliest uncompleted call successfully.
-     * 
+     *
      * @return true if there was an uncompleted call to complete
      */
     public synchronized boolean completeNext() {
@@ -177,7 +177,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
 
     /**
      * Complete the earliest uncompleted call with the given error.
-     * 
+     *
      * @return true if there was an uncompleted call to complete
      */
     public synchronized boolean errorNext(RuntimeException e) {
@@ -190,6 +190,26 @@ public class MockProducer implements Producer<byte[], byte[]> {
         }
     }
 
+    /**
+     * computes partition for given record.
+     */
+    private int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
+        Integer partition = record.partition();
+        if (partition != null) {
+            List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
+            int numPartitions = partitions.size();
+            // they have given us a partition, use it
+            if (partition < 0 || partition >= numPartitions)
+                throw new IllegalArgumentException("Invalid partition given with record: " + partition
+                                                   + " is not in the range [0..."
+                                                   + numPartitions
+                                                   + "].");
+            return partition;
+        }
+        return this.partitioner.partition(record.topic(), null, record.key(), null, record.value(), cluster);
+    }
+
+
     private static class Completion {
         private final long offset;
         private final RecordMetadata metadata;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
new file mode 100644
index 0000000..383619d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.Cluster;
+
+/**
+ * Partitioner Interface
+ */
+
+public interface Partitioner extends Configurable {
+
+    /**
+     * Compute the partition for the given record.
+     *
+     * @param topic The topic name
+     * @param key The key to partition on (or null if no key)
+     * @param keyBytes The serialized key to partition on( or null if no key)
+     * @param value The value to partition on or null
+     * @param valueBytes The serialized value to partition on or null
+     * @param cluster The current cluster metadata
+     */
+    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
+
+    /**
+     * This is called when partitioner is closed.
+     */
+    public void close();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 187d000..023bd2e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -51,7 +51,7 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>metadata.max.age.ms</code> */
     public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
     private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
-            
+
     /** <code>batch.size</code> */
     public static final String BATCH_SIZE_CONFIG = "batch.size";
     private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the "
@@ -169,6 +169,11 @@ public class ProducerConfig extends AbstractConfig {
     public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
     private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
 
+    /** <code>partitioner.class</code> */
+    public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
+    private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>Partitioner</code> interface.";
+
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -217,7 +222,8 @@ public class ProducerConfig extends AbstractConfig {
                                         Importance.LOW,
                                         MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
                                 .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
-                                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
+                                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
+                                .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);
     }
 
     public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
new file mode 100644
index 0000000..f81c496
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * The default partitioning strategy:
+ * <ul>
+ * <li>If a partition is specified in the record, use it
+ * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
+ * <li>If no partition or key is present choose a partition in a round-robin fashion
+ */
+public class DefaultPartitioner implements Partitioner {
+
+    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
+
+    /**
+     * A cheap way to deterministically convert a number to a positive value. When the input is
+     * positive, the original value is returned. When the input number is negative, the returned
+     * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
+     * value.
+     *
+     * Note: changing this method in the future will possibly cause partition selection not to be
+     * compatible with the existing messages already placed on a partition.
+     *
+     * @param number a given number
+     * @return a positive number.
+     */
+    private static int toPositive(int number) {
+        return number & 0x7fffffff;
+    }
+
+    public void configure(Map<String, ?> configs) {}
+
+    /**
+     * Compute the partition for the given record.
+     *
+     * @param topic The topic name
+     * @param key The key to partition on (or null if no key)
+     * @param keyBytes serialized key to partition on (or null if no key)
+     * @param value The value to partition on or null
+     * @param valueBytes serialized value to partition on or null
+     * @param cluster The current cluster metadata
+     */
+    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+        int numPartitions = partitions.size();
+        if (keyBytes == null) {
+            int nextValue = counter.getAndIncrement();
+            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
+            if (availablePartitions.size() > 0) {
+                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
+                return availablePartitions.get(part).partition();
+            } else {
+                // no partitions are available, give a non-available partition
+                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
+            }
+        } else {
+            // hash the keyBytes to choose a partition
+            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+        }
+    }
+
+    public void close() {}
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
deleted file mode 100644
index 93e7991..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.utils.Utils;
-
-/**
- * The default partitioning strategy:
- * <ul>
- * <li>If a partition is specified in the record, use it
- * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
- * <li>If no partition or key is present choose a partition in a round-robin fashion
- */
-public class Partitioner {
-
-    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
-
-    /**
-     * A cheap way to deterministically convert a number to a positive value. When the input is 
-     * positive, the original value is returned. When the input number is negative, the returned
-     * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
-     * value.
-     * 
-     * Note: changing this method in the future will possibly cause partition selection not to be
-     * compatible with the existing messages already placed on a partition. 
-     * 
-     * @param number a given number
-     * @return a positive number.
-     */
-    private static int toPositive(int number) {
-        return number & 0x7fffffff; 
-    }
-
-    /**
-     * Compute the partition for the given record.
-     * 
-     * @param topic The topic name
-     * @param key The key to partition on (or null if no key)
-     * @param partition The partition to use (or null if none)
-     * @param cluster The current cluster metadata
-     */
-    public int partition(String topic, byte[] key, Integer partition, Cluster cluster) {
-        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
-        int numPartitions = partitions.size();
-        if (partition != null) {
-            // they have given us a partition, use it
-            if (partition < 0 || partition >= numPartitions)
-                throw new IllegalArgumentException("Invalid partition given with record: " + partition
-                                                   + " is not in the range [0..."
-                                                   + numPartitions
-                                                   + "].");
-            return partition;
-        } else if (key == null) {
-            int nextValue = counter.getAndIncrement();
-            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
-            if (availablePartitions.size() > 0) {
-                int part = Partitioner.toPositive(nextValue) % availablePartitions.size();
-                return availablePartitions.get(part).partition();
-            } else {
-                // no partitions are available, give a non-available partition
-                return Partitioner.toPositive(nextValue) % numPartitions;
-            }
-        } else {
-            // hash the key to choose a partition
-            return Partitioner.toPositive(Utils.murmur2(key)) % numPartitions;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
new file mode 100644
index 0000000..977fa93
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+
+public class DefaultPartitionerTest {
+    private byte[] keyBytes = "key".getBytes();
+    private Partitioner partitioner = new DefaultPartitioner();
+    private Node node0 = new Node(0, "localhost", 99);
+    private Node node1 = new Node(1, "localhost", 100);
+    private Node node2 = new Node(2, "localhost", 101);
+    private Node[] nodes = new Node[] {node0, node1, node2};
+    private String topic = "test";
+    // Intentionally make the partition list not in partition order to test the edge cases.
+    private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
+                                                    new PartitionInfo(topic, 2, node1, nodes, nodes),
+                                                    new PartitionInfo(topic, 0, node0, nodes, nodes));
+    private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
+
+    @Test
+    public void testKeyPartitionIsStable() {
+        int partition = partitioner.partition("test",  null, keyBytes, null, null, cluster);
+        assertEquals("Same key should yield same partition", partition, partitioner.partition("test", null, keyBytes, null, null, cluster));
+    }
+
+    @Test
+    public void testRoundRobinWithUnavailablePartitions() {
+        // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition,
+        // and (2) the available partitions are selected in a round robin way.
+        int countForPart0 = 0;
+        int countForPart2 = 0;
+        for (int i = 1; i <= 100; i++) {
+            int part = partitioner.partition("test", null, null, null, null, cluster);
+            assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2);
+            if (part == 0)
+                countForPart0++;
+            else
+                countForPart2++;
+        }
+        assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java
deleted file mode 100644
index 5dadd0e..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Test;
-
-public class PartitionerTest {
-
-    private byte[] key = "key".getBytes();
-    private Partitioner partitioner = new Partitioner();
-    private Node node0 = new Node(0, "localhost", 99);
-    private Node node1 = new Node(1, "localhost", 100);
-    private Node node2 = new Node(2, "localhost", 101);
-    private Node[] nodes = new Node[] {node0, node1, node2};
-    private String topic = "test";
-    // Intentionally make the partition list not in partition order to test the edge cases.
-    private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
-                                                    new PartitionInfo(topic, 2, node1, nodes, nodes),
-                                                    new PartitionInfo(topic, 0, node0, nodes, nodes));
-    private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
-
-    @Test
-    public void testUserSuppliedPartitioning() {
-        assertEquals("If the user supplies a partition we should use it.", 0, partitioner.partition("test", key, 0, cluster));
-    }
-
-    @Test
-    public void testKeyPartitionIsStable() {
-        int partition = partitioner.partition("test", key, null, cluster);
-        assertEquals("Same key should yield same partition", partition, partitioner.partition("test", key, null, cluster));
-    }
-
-    @Test
-    public void testRoundRobinWithUnavailablePartitions() {
-        // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition,
-        // and (2) the available partitions are selected in a round robin way.
-        int countForPart0 = 0;
-        int countForPart2 = 0;
-        for (int i = 1; i <= 100; i++) {
-            int part = partitioner.partition("test", null, null, cluster);
-            assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2);
-            if (part == 0)
-                countForPart0++;
-            else
-                countForPart2++;
-        }
-        assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2);
-    }
-}