You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/09 01:57:12 UTC

kafka git commit: KAFKA-3191: Improve offset committing JavaDoc in KafkaConsumer

Repository: kafka
Updated Branches:
  refs/heads/trunk feda3f68e -> 0eaede4dc


KAFKA-3191: Improve offset committing JavaDoc in KafkaConsumer

Added an example clarifying the correct way to use explicit offsets with commitSync().

Author: Adam Kunicki <ad...@streamsets.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #850 from kunickiaj/KAFKA-3191


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0eaede4d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0eaede4d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0eaede4d

Branch: refs/heads/trunk
Commit: 0eaede4dc95846e2b8f7452f41c58c0122e7a563
Parents: feda3f6
Author: Adam Kunicki <ad...@streamsets.com>
Authored: Mon Feb 8 16:56:59 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Feb 8 16:56:59 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 31 ++++++++++++++++++--
 1 file changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0eaede4d/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index afe3240..c14cc68 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -217,6 +217,31 @@ import java.util.regex.Pattern;
  *     }
  * </pre>
  *
+ * The above example uses {@link #commitSync() commitSync} to mark all received messages as committed. In some cases
+ * you may wish to have even finer control over which messages have been committed by specifying an offset explicitly.
+ * In the example below we commit offset after we finish handling the messages in each partition.
+ * <p>
+ * <pre>
+ *     try {
+ *         while(running) {
+ *             ConsumerRecords&lt;String, String&gt; records = consumer.poll(Long.MAX_VALUE);
+ *             for (TopicPartition partition : records.partitions()) {
+ *                 List&lt;ConsumerRecord&lt;String, String&gt;&gt; partitionRecords = records.records(partition);
+ *                 for (ConsumerRecord&lt;String, String&gt; record : partitionRecords) {
+ *                     System.out.println(record.offset() + &quot;: &quot; + record.value());
+ *                 }
+ *                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ *                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ *             }
+ *         }
+ *     } finally {
+ *       consumer.close();
+ *     }
+ * </pre>
+ *
+ * <b>Note: The committed offset should always be the offset of the next message that your application will read.</b>
+ * Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should add one to the offset of the last message processed.
+ *
  * <h4>Subscribing To Specific Partitions</h4>
  *
  * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
@@ -917,7 +942,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * <p>
      * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
      * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
-     * should not be used.
+     * should not be used. The committed offset should be the next message your application will consume,
+     * i.e. lastProcessedMessageOffset + 1.
      * <p>
      * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
      * encountered (in which case it is thrown to the caller).
@@ -979,7 +1005,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * <p>
      * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
      * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
-     * should not be used.
+     * should not be used. The committed offset should be the next message your application will consume,
+     * i.e. lastProcessedMessageOffset + 1.
      * <p>
      * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback
      * (if provided) or discarded.