You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/05/28 22:46:47 UTC
[1/2] kafka git commit: KAFKA-2186;
Follow-up to KAFKA-1650 - add selective offset commit to consumer
connector API; reviewed by Joel Koshy
Repository: kafka
Updated Branches:
refs/heads/trunk c60f32501 -> d6c45c70f
KAFKA-2186; Follow-up to KAFKA-1650 - add selective offset commit to consumer connector API; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05e8a781
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05e8a781
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05e8a781
Branch: refs/heads/trunk
Commit: 05e8a78145fc3db673bb46be8ba1788558c5a26d
Parents: c60f325
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu May 28 13:22:30 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu May 28 13:22:30 2015 -0700
----------------------------------------------------------------------
.../scala/kafka/javaapi/consumer/ConsumerConnector.java | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/05e8a781/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index cc3400f..ca74ca8 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -20,6 +20,9 @@ package kafka.javaapi.consumer;
import java.util.List;
import java.util.Map;
+
+import kafka.common.OffsetAndMetadata;
+import kafka.common.TopicAndPartition;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.serializer.Decoder;
@@ -65,6 +68,14 @@ public interface ConsumerConnector {
public void commitOffsets(boolean retryOnFailure);
/**
+ * Commit offsets using the provided offsets map
+ *
+ * @param offsetsToCommit a map containing the offset to commit for each partition.
+ * @param retryOnFailure enable retries on the offset commit if it fails.
+ */
+ public void commitOffsets(Map<TopicAndPartition, OffsetAndMetadata> offsetsToCommit, boolean retryOnFailure);
+
+ /**
* Shut down the connector
*/
public void shutdown();
[2/2] kafka git commit: KAFKA-2091;
Expose a partitioner interface in the new producer
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer);
reviewed by Joel Koshy and Jay Kreps
Posted by jj...@apache.org.
KAFKA-2091; Expose a partitioner interface in the new producer
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer);
reviewed by Joel Koshy and Jay Kreps
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d6c45c70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d6c45c70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d6c45c70
Branch: refs/heads/trunk
Commit: d6c45c70fb9773043766446e88370db9709e7995
Parents: 05e8a78
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Thu May 28 13:27:05 2015 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu May 28 13:27:12 2015 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 82 +++++++++++-------
.../kafka/clients/producer/MockProducer.java | 40 ++++++---
.../kafka/clients/producer/Partitioner.java | 46 ++++++++++
.../kafka/clients/producer/ProducerConfig.java | 14 ++-
.../producer/internals/DefaultPartitioner.java | 89 ++++++++++++++++++++
.../clients/producer/internals/Partitioner.java | 89 --------------------
.../internals/DefaultPartitionerTest.java | 63 ++++++++++++++
.../producer/internals/PartitionerTest.java | 68 ---------------
8 files changed, 289 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 8e336a3..ded19d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.producer.internals.Partitioner;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
@@ -73,11 +72,11 @@ import org.slf4j.LoggerFactory;
* props.put("buffer.memory", 33554432);
* props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
* props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- *
+ *
* Producer<String, String> producer = new KafkaProducer(props);
* for(int i = 0; i < 100; i++)
* producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
- *
+ *
* producer.close();
* }</pre>
* <p>
@@ -92,25 +91,25 @@ import org.slf4j.LoggerFactory;
* we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
* <p>
* If the request fails, the producer can automatically retry, though since we have specified <code>retries</code>
- * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on
+ * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on
* <a href="http://kafka.apache.org/documentation.html#semantics">message delivery semantics</a> for details).
* <p>
- * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by
+ * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by
* the <code>batch.size</code> config. Making this larger can result in more batching, but requires more memory (since we will
* generally have one of these buffers for each active partition).
* <p>
- * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you
+ * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you
* want to reduce the number of requests you can set <code>linger.ms</code> to something greater than 0. This will
- * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will
- * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above,
- * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting
- * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that
- * records that arrive close together in time will generally batch together even with <code>linger.ms=0</code> so under heavy load
+ * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will
+ * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above,
+ * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting
+ * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that
+ * records that arrive close together in time will generally batch together even with <code>linger.ms=0</code> so under heavy load
* batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more
* efficient requests when not under maximal load at the cost of a small amount of latency.
* <p>
* The <code>buffer.memory</code> controls the total amount of memory available to the producer for buffering. If records
- * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is
+ * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is
* exhausted additional send calls will block. For uses where you want to avoid any blocking you can set <code>block.on.buffer.full=false</code> which
* will cause the send call to result in an exception.
* <p>
@@ -207,7 +206,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
MetricsReporter.class);
reporters.add(new JmxReporter(jmxPrefix));
this.metrics = new Metrics(metricConfig, reporters, time);
- this.partitioner = new Partitioner();
+ this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
@@ -285,7 +284,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
/**
- * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
+ * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
@@ -309,7 +308,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* or throw any exception that occurred while sending the record.
* <p>
* If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately:
- *
+ *
* <pre>
* {@code
* byte[] key = "key".getBytes();
@@ -320,7 +319,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* <p>
* Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
* will be invoked when the request is complete.
- *
+ *
* <pre>
* {@code
* ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
@@ -334,10 +333,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* });
* }
* </pre>
- *
+ *
* Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
* following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
- *
+ *
* <pre>
* {@code
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
@@ -349,15 +348,15 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* they will delay the sending of messages from other threads. If you want to execute blocking or computationally
* expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
* to parallelize processing.
- *
+ *
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
- *
+ *
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid objects given the configured serializers
* @throws BufferExhaustedException If <code>block.on.buffer.full=false</code> and the buffer is full.
- *
+ *
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
@@ -380,7 +379,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
- int partition = partitioner.partition(record.topic(), serializedKey, record.partition(), metadata.fetch());
+ int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
TopicPartition tp = new TopicPartition(record.topic(), partition);
@@ -452,12 +451,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}
-
+
/**
- * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
+ * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
* greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
- * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
- * A request is considered completed when it is successfully acknowledged
+ * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
+ * A request is considered completed when it is successfully acknowledged
* according to the <code>acks</code> configuration you have specified or else it results in an error.
* <p>
* Other threads can continue sending records while one thread is blocked waiting for a flush call to complete,
@@ -475,10 +474,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* consumer.commit();
* }
* </pre>
- *
+ *
* Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur
* we need to set <code>retries=<large_number></code> in our config.
- *
+ *
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
@@ -550,7 +549,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
public void close(long timeout, TimeUnit timeUnit) {
close(timeout, timeUnit, false);
}
-
+
private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {
if (timeout < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
@@ -600,6 +599,27 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throw new KafkaException("Failed to close kafka producer", firstException.get());
}
+ /**
+ * computes partition for given record.
+ * if the record has partition returns the value otherwise
+ * calls configured partitioner class to compute the partition.
+ */
+ private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
+ Integer partition = record.partition();
+ if (partition != null) {
+ List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
+ int numPartitions = partitions.size();
+ // they have given us a partition, use it
+ if (partition < 0 || partition >= numPartitions)
+ throw new IllegalArgumentException("Invalid partition given with record: " + partition
+ + " is not in the range [0..."
+ + numPartitions
+ + "].");
+ return partition;
+ }
+ return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
+ }
+
private static class FutureFailure implements Future<RecordMetadata> {
private final ExecutionException exception;
http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 3c34610..e66491c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -27,7 +27,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
-import org.apache.kafka.clients.producer.internals.Partitioner;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
import org.apache.kafka.common.*;
@@ -41,7 +41,7 @@ import org.apache.kafka.common.*;
public class MockProducer implements Producer<byte[], byte[]> {
private final Cluster cluster;
- private final Partitioner partitioner = new Partitioner();
+ private final Partitioner partitioner = new DefaultPartitioner();
private final List<ProducerRecord<byte[], byte[]>> sent;
private final Deque<Completion> completions;
private boolean autoComplete;
@@ -49,7 +49,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
/**
* Create a mock producer
- *
+ *
* @param cluster The cluster holding metadata for this producer
* @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
* the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
@@ -66,7 +66,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
/**
* Create a new mock producer with invented metadata the given autoComplete setting.
- *
+ *
* Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)}
*/
public MockProducer(boolean autoComplete) {
@@ -75,7 +75,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
/**
* Create a new auto completing mock producer
- *
+ *
* Equivalent to {@link #MockProducer(boolean) new MockProducer(true)}
*/
public MockProducer() {
@@ -94,14 +94,14 @@ public class MockProducer implements Producer<byte[], byte[]> {
/**
* Adds the record to the list of sent records.
- *
+ *
* @see #history()
*/
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
int partition = 0;
if (this.cluster.partitionsForTopic(record.topic()) != null)
- partition = partitioner.partition(record.topic(), record.key(), record.partition(), this.cluster);
+ partition = partition(record, this.cluster);
ProduceRequestResult result = new ProduceRequestResult();
FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
@@ -129,7 +129,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
return offset;
}
}
-
+
public synchronized void flush() {
while (!this.completions.isEmpty())
completeNext();
@@ -168,7 +168,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
/**
* Complete the earliest uncompleted call successfully.
- *
+ *
* @return true if there was an uncompleted call to complete
*/
public synchronized boolean completeNext() {
@@ -177,7 +177,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
/**
* Complete the earliest uncompleted call with the given error.
- *
+ *
* @return true if there was an uncompleted call to complete
*/
public synchronized boolean errorNext(RuntimeException e) {
@@ -190,6 +190,26 @@ public class MockProducer implements Producer<byte[], byte[]> {
}
}
+ /**
+ * computes partition for given record.
+ */
+ private int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
+ Integer partition = record.partition();
+ if (partition != null) {
+ List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
+ int numPartitions = partitions.size();
+ // they have given us a partition, use it
+ if (partition < 0 || partition >= numPartitions)
+ throw new IllegalArgumentException("Invalid partition given with record: " + partition
+ + " is not in the range [0..."
+ + numPartitions
+ + "].");
+ return partition;
+ }
+ return this.partitioner.partition(record.topic(), null, record.key(), null, record.value(), cluster);
+ }
+
+
private static class Completion {
private final long offset;
private final RecordMetadata metadata;
http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
new file mode 100644
index 0000000..383619d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.Cluster;
+
+/**
+ * Partitioner Interface
+ */
+
+public interface Partitioner extends Configurable {
+
+ /**
+ * Compute the partition for the given record.
+ *
+ * @param topic The topic name
+ * @param key The key to partition on (or null if no key)
+ * @param keyBytes The serialized key to partition on( or null if no key)
+ * @param value The value to partition on or null
+ * @param valueBytes The serialized value to partition on or null
+ * @param cluster The current cluster metadata
+ */
+ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
+
+ /**
+ * This is called when partitioner is closed.
+ */
+ public void close();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 187d000..023bd2e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -3,9 +3,9 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -51,7 +51,7 @@ public class ProducerConfig extends AbstractConfig {
/** <code>metadata.max.age.ms</code> */
public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
-
+
/** <code>batch.size</code> */
public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the "
@@ -169,6 +169,11 @@ public class ProducerConfig extends AbstractConfig {
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
+ /** <code>partitioner.class</code> */
+ public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
+ private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>Partitioner</code> interface.";
+
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -217,7 +222,8 @@ public class ProducerConfig extends AbstractConfig {
Importance.LOW,
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
- .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
+ .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
+ .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);
}
public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
new file mode 100644
index 0000000..f81c496
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
+
+/**
+ * The default partitioning strategy:
+ * <ul>
+ * <li>If a partition is specified in the record, use it
+ * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
+ * <li>If no partition or key is present choose a partition in a round-robin fashion
+ */
+public class DefaultPartitioner implements Partitioner {
+
+ private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
+
+ /**
+ * A cheap way to deterministically convert a number to a positive value. When the input is
+ * positive, the original value is returned. When the input number is negative, the returned
+ * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
+ * value.
+ *
+ * Note: changing this method in the future will possibly cause partition selection not to be
+ * compatible with the existing messages already placed on a partition.
+ *
+ * @param number a given number
+ * @return a positive number.
+ */
+ private static int toPositive(int number) {
+ return number & 0x7fffffff;
+ }
+
+ public void configure(Map<String, ?> configs) {}
+
+ /**
+ * Compute the partition for the given record.
+ *
+ * @param topic The topic name
+ * @param key The key to partition on (or null if no key)
+ * @param keyBytes serialized key to partition on (or null if no key)
+ * @param value The value to partition on or null
+ * @param valueBytes serialized value to partition on or null
+ * @param cluster The current cluster metadata
+ */
+ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+ List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+ int numPartitions = partitions.size();
+ if (keyBytes == null) {
+ int nextValue = counter.getAndIncrement();
+ List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
+ if (availablePartitions.size() > 0) {
+ int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
+ return availablePartitions.get(part).partition();
+ } else {
+ // no partitions are available, give a non-available partition
+ return DefaultPartitioner.toPositive(nextValue) % numPartitions;
+ }
+ } else {
+ // hash the keyBytes to choose a partition
+ return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+ }
+ }
+
+ public void close() {}
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
deleted file mode 100644
index 93e7991..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.utils.Utils;
-
-/**
- * The default partitioning strategy:
- * <ul>
- * <li>If a partition is specified in the record, use it
- * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
- * <li>If no partition or key is present choose a partition in a round-robin fashion
- */
-public class Partitioner {
-
- private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
-
- /**
- * A cheap way to deterministically convert a number to a positive value. When the input is
- * positive, the original value is returned. When the input number is negative, the returned
- * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
- * value.
- *
- * Note: changing this method in the future will possibly cause partition selection not to be
- * compatible with the existing messages already placed on a partition.
- *
- * @param number a given number
- * @return a positive number.
- */
- private static int toPositive(int number) {
- return number & 0x7fffffff;
- }
-
- /**
- * Compute the partition for the given record.
- *
- * @param topic The topic name
- * @param key The key to partition on (or null if no key)
- * @param partition The partition to use (or null if none)
- * @param cluster The current cluster metadata
- */
- public int partition(String topic, byte[] key, Integer partition, Cluster cluster) {
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- int numPartitions = partitions.size();
- if (partition != null) {
- // they have given us a partition, use it
- if (partition < 0 || partition >= numPartitions)
- throw new IllegalArgumentException("Invalid partition given with record: " + partition
- + " is not in the range [0..."
- + numPartitions
- + "].");
- return partition;
- } else if (key == null) {
- int nextValue = counter.getAndIncrement();
- List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
- if (availablePartitions.size() > 0) {
- int part = Partitioner.toPositive(nextValue) % availablePartitions.size();
- return availablePartitions.get(part).partition();
- } else {
- // no partitions are available, give a non-available partition
- return Partitioner.toPositive(nextValue) % numPartitions;
- }
- } else {
- // hash the key to choose a partition
- return Partitioner.toPositive(Utils.murmur2(key)) % numPartitions;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
new file mode 100644
index 0000000..977fa93
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+
+public class DefaultPartitionerTest {
+ private byte[] keyBytes = "key".getBytes();
+ private Partitioner partitioner = new DefaultPartitioner();
+ private Node node0 = new Node(0, "localhost", 99);
+ private Node node1 = new Node(1, "localhost", 100);
+ private Node node2 = new Node(2, "localhost", 101);
+ private Node[] nodes = new Node[] {node0, node1, node2};
+ private String topic = "test";
+ // Intentionally make the partition list not in partition order to test the edge cases.
+ private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
+ new PartitionInfo(topic, 2, node1, nodes, nodes),
+ new PartitionInfo(topic, 0, node0, nodes, nodes));
+ private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
+
+ @Test
+ public void testKeyPartitionIsStable() {
+ int partition = partitioner.partition("test", null, keyBytes, null, null, cluster);
+ assertEquals("Same key should yield same partition", partition, partitioner.partition("test", null, keyBytes, null, null, cluster));
+ }
+
+ @Test
+ public void testRoundRobinWithUnavailablePartitions() {
+ // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition,
+ // and (2) the available partitions are selected in a round robin way.
+ int countForPart0 = 0;
+ int countForPart2 = 0;
+ for (int i = 1; i <= 100; i++) {
+ int part = partitioner.partition("test", null, null, null, null, cluster);
+ assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2);
+ if (part == 0)
+ countForPart0++;
+ else
+ countForPart2++;
+ }
+ assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d6c45c70/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java
deleted file mode 100644
index 5dadd0e..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/PartitionerTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Test;
-
-public class PartitionerTest {
-
- private byte[] key = "key".getBytes();
- private Partitioner partitioner = new Partitioner();
- private Node node0 = new Node(0, "localhost", 99);
- private Node node1 = new Node(1, "localhost", 100);
- private Node node2 = new Node(2, "localhost", 101);
- private Node[] nodes = new Node[] {node0, node1, node2};
- private String topic = "test";
- // Intentionally make the partition list not in partition order to test the edge cases.
- private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
- new PartitionInfo(topic, 2, node1, nodes, nodes),
- new PartitionInfo(topic, 0, node0, nodes, nodes));
- private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
-
- @Test
- public void testUserSuppliedPartitioning() {
- assertEquals("If the user supplies a partition we should use it.", 0, partitioner.partition("test", key, 0, cluster));
- }
-
- @Test
- public void testKeyPartitionIsStable() {
- int partition = partitioner.partition("test", key, null, cluster);
- assertEquals("Same key should yield same partition", partition, partitioner.partition("test", key, null, cluster));
- }
-
- @Test
- public void testRoundRobinWithUnavailablePartitions() {
- // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition,
- // and (2) the available partitions are selected in a round robin way.
- int countForPart0 = 0;
- int countForPart2 = 0;
- for (int i = 1; i <= 100; i++) {
- int part = partitioner.partition("test", null, null, cluster);
- assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2);
- if (part == 0)
- countForPart0++;
- else
- countForPart2++;
- }
- assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2);
- }
-}