You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/13 09:26:33 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #12753: MINOR: Document Offset and Partition 0-indexing, fix typo

showuon commented on code in PR #12753:
URL: https://github.com/apache/kafka/pull/12753#discussion_r1046857998


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,27 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (ie: it is a valid offset).
+     * <p>
+     * {@link KafkaConsumer#seekToBeginning(Collection)} will go to the first offset in the topic.
+     * seek(0) is equivalent to seekToBeginning for a topic with beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link KafkaConsumer#seekToEnd(Collection)} is equivalent to seeking to the highest known offset + 1.
+     * <p>
+     * Seeking to the offset smaller than the log start offset or larger than the log end offset
+     * or high watermark means an invalid offset is reached.
+     * Invalid offset behaviour is controlled by
+     * the {@link ConsumerConfig AUTO_RESET_CONFIG} property.

Review Comment:
   nit: These 2 lines can be put in the same line
   `Invalid offset behaviour is controlled by`
   `the {@link ConsumerConfig AUTO_RESET_CONFIG} property.`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,27 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (ie: it is a valid offset).

Review Comment:
   nit: `ie.` -> `i.e.`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,27 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (ie: it is a valid offset).
+     * <p>
+     * {@link KafkaConsumer#seekToBeginning(Collection)} will go to the first offset in the topic.
+     * seek(0) is equivalent to seekToBeginning for a topic with beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link KafkaConsumer#seekToEnd(Collection)} is equivalent to seeking to the highest known offset + 1.

Review Comment:
   Could we say:
   {seekToEnd(Collection)} is equivalent to seeking to `the highest known offset + 1` -> {seekToEnd(Collection)} is equivalent to seeking to `the last offset of the partition`. 
   That's the wording in the javadoc in `seekToEnd` method.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,27 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (ie: it is a valid offset).
+     * <p>
+     * {@link KafkaConsumer#seekToBeginning(Collection)} will go to the first offset in the topic.
+     * seek(0) is equivalent to seekToBeginning for a topic with beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link KafkaConsumer#seekToEnd(Collection)} is equivalent to seeking to the highest known offset + 1.
+     * <p>
+     * Seeking to the offset smaller than the log start offset or larger than the log end offset
+     * or high watermark means an invalid offset is reached.
+     * Invalid offset behaviour is controlled by
+     * the {@link ConsumerConfig AUTO_RESET_CONFIG} property.
+     * If this is set to "earliest", the next poll will return records from the starting offset.
+     * If it is set to "latest", it will seek to the last known record (similar to seekToEnd()).
+     * If it is set to "none", an {@link OffsetOutOfRangeException} will be thrown.
+     * <p>
+     * Note that, the seek offset won't change to the in-flight fetch request, it will take effect in next fetch request.
+     * So, the consumer might wait for {@code fetch.max.wait.ms} before starting to fetch the records from desired offset.
      *
+     * @param offset the next offset returned by poll() will be either this or greater.

Review Comment:
   Since we're here, should we also add param for `partition` ?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,27 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (ie: it is a valid offset).
+     * <p>
+     * {@link KafkaConsumer#seekToBeginning(Collection)} will go to the first offset in the topic.
+     * seek(0) is equivalent to seekToBeginning for a topic with beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link KafkaConsumer#seekToEnd(Collection)} is equivalent to seeking to the highest known offset + 1.

Review Comment:
   is equivalent to `seek` to the highest known offset + 1



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,27 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (ie: it is a valid offset).
+     * <p>
+     * {@link KafkaConsumer#seekToBeginning(Collection)} will go to the first offset in the topic.
+     * seek(0) is equivalent to seekToBeginning for a topic with beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link KafkaConsumer#seekToEnd(Collection)} is equivalent to seeking to the highest known offset + 1.
+     * <p>
+     * Seeking to the offset smaller than the log start offset or larger than the log end offset
+     * or high watermark means an invalid offset is reached.
+     * Invalid offset behaviour is controlled by
+     * the {@link ConsumerConfig AUTO_RESET_CONFIG} property.
+     * If this is set to "earliest", the next poll will return records from the starting offset.
+     * If it is set to "latest", it will seek to the last known record (similar to seekToEnd()).
+     * If it is set to "none", an {@link OffsetOutOfRangeException} will be thrown.

Review Comment:
   I don't think it will link to `OffsetOutOfRangeException` with this format. Maybe we just use `code`:
   * If it is set to "none", an {@code OffsetOutOfRangeException} will be thrown.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,27 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (ie: it is a valid offset).
+     * <p>
+     * {@link KafkaConsumer#seekToBeginning(Collection)} will go to the first offset in the topic.

Review Comment:
   nit: `{@link KafkaConsumer#seekToBeginning(Collection)}` -> {@link #seekToBeginning(Collection)}



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,27 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (ie: it is a valid offset).
+     * <p>
+     * {@link KafkaConsumer#seekToBeginning(Collection)} will go to the first offset in the topic.
+     * seek(0) is equivalent to seekToBeginning for a topic with beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link KafkaConsumer#seekToEnd(Collection)} is equivalent to seeking to the highest known offset + 1.

Review Comment:
   nit: `{@link #seekToEnd(Collection)}`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,27 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (ie: it is a valid offset).
+     * <p>
+     * {@link KafkaConsumer#seekToBeginning(Collection)} will go to the first offset in the topic.
+     * seek(0) is equivalent to seekToBeginning for a topic with beginning offset 0,

Review Comment:
   for a topic [partition] with beginning offset 0



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1581,7 +1581,27 @@ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, Of
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (ie: it is a valid offset).
+     * <p>
+     * {@link KafkaConsumer#seekToBeginning(Collection)} will go to the first offset in the topic.
+     * seek(0) is equivalent to seekToBeginning for a topic with beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link KafkaConsumer#seekToEnd(Collection)} is equivalent to seeking to the highest known offset + 1.
+     * <p>
+     * Seeking to the offset smaller than the log start offset or larger than the log end offset
+     * or high watermark means an invalid offset is reached.
+     * Invalid offset behaviour is controlled by
+     * the {@link ConsumerConfig AUTO_RESET_CONFIG} property.
+     * If this is set to "earliest", the next poll will return records from the starting offset.
+     * If it is set to "latest", it will seek to the last known record (similar to seekToEnd()).

Review Comment:
   nit: it will seek to the last `known record` -> it will seek to the last `offset`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org