You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/09 01:57:12 UTC
kafka git commit: KAFKA-3191: Improve offset committing JavaDoc in
KafkaConsumer
Repository: kafka
Updated Branches:
refs/heads/trunk feda3f68e -> 0eaede4dc
KAFKA-3191: Improve offset committing JavaDoc in KafkaConsumer
Added an example clarifying the correct way to use explicit offsets with commitSync().
Author: Adam Kunicki <ad...@streamsets.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #850 from kunickiaj/KAFKA-3191
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0eaede4d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0eaede4d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0eaede4d
Branch: refs/heads/trunk
Commit: 0eaede4dc95846e2b8f7452f41c58c0122e7a563
Parents: feda3f6
Author: Adam Kunicki <ad...@streamsets.com>
Authored: Mon Feb 8 16:56:59 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Feb 8 16:56:59 2016 -0800
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 31 ++++++++++++++++++--
1 file changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0eaede4d/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index afe3240..c14cc68 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -217,6 +217,31 @@ import java.util.regex.Pattern;
* }
* </pre>
*
+ * The above example uses {@link #commitSync() commitSync} to mark all received messages as committed. In some cases
+ * you may wish to have even finer control over which messages have been committed by specifying an offset explicitly.
+ * In the example below we commit offset after we finish handling the messages in each partition.
+ * <p>
+ * <pre>
+ * try {
+ * while(running) {
+ * ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
+ * for (TopicPartition partition : records.partitions()) {
+ * List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ * for (ConsumerRecord<String, String> record : partitionRecords) {
+ * System.out.println(record.offset() + ": " + record.value());
+ * }
+ * long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ * consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ * }
+ * }
+ * } finally {
+ * consumer.close();
+ * }
+ * </pre>
+ *
+ * <b>Note: The committed offset should always be the offset of the next message that your application will read.</b>
+ * Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should add one to the offset of the last message processed.
+ *
* <h4>Subscribing To Specific Partitions</h4>
*
* In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
@@ -917,7 +942,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* <p>
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
- * should not be used.
+ * should not be used. The committed offset should be the next message your application will consume,
+ * i.e. lastProcessedMessageOffset + 1.
* <p>
* This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
* encountered (in which case it is thrown to the caller).
@@ -979,7 +1005,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* <p>
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
- * should not be used.
+ * should not be used. The committed offset should be the next message your application will consume,
+ * i.e. lastProcessedMessageOffset + 1.
* <p>
* This is an asynchronous call and will not block. Any errors encountered are either passed to the callback
* (if provided) or discarded.