You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/12/16 09:13:05 UTC

[kafka] branch trunk updated: MINOR: Document Offset and Partition 0-indexing, fix typo (#12753)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3585a4cd5d MINOR: Document Offset and Partition 0-indexing, fix typo (#12753)
e3585a4cd5d is described below

commit e3585a4cd5ddb5b8475a49c38143d18e7c640bfe
Author: Daniel Scanteianu <da...@gmail.com>
AuthorDate: Fri Dec 16 09:12:40 2022 +0000

    MINOR: Document Offset and Partition 0-indexing, fix typo (#12753)
    
    Add comments to clarify that both offsets and partitions are 0-indexed, and fix a minor typo. Clarify which offset will be retrieved by poll() after seek() is used in various circumstances. Also added integration tests.
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      | 21 ++++++++++
 .../kafka/clients/producer/ProducerRecord.java     |  2 +-
 .../kafka/api/PlaintextConsumerTest.scala          | 49 +++++++++++++++++++++-
 .../java/org/apache/kafka/queue/EventQueue.java    |  2 +-
 4 files changed, 71 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index cf85798f82b..1d756d1e64c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1581,7 +1581,28 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
      * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
      * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     * <p>
+     * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+     * a record with that offset exists (i.e. it is a valid offset).
+     * <p>
+     * {@link #seekToBeginning(Collection)} will go to the first offset in the topic.
+     * seek(0) is equivalent to seekToBeginning for a TopicPartition with beginning offset 0,
+     * assuming that there is a record at offset 0 still available.
+     * {@link #seekToEnd(Collection)} is equivalent to seeking to the last offset of the partition, but behavior depends on
+     * {@code isolation.level}, so see {@link #seekToEnd(Collection)} documentation for more details.
+     * <p>
+     * Seeking to the offset smaller than the log start offset or larger than the log end offset
+     * means an invalid offset is reached.
+     * Invalid offset behaviour is controlled by the {@code auto.offset.reset} property.
+     * If this is set to "earliest", the next poll will return records from the starting offset.
+     * If it is set to "latest", it will seek to the last offset (similar to seekToEnd()).
+     * If it is set to "none", an {@code OffsetOutOfRangeException} will be thrown.
+     * <p>
+     * Note that, the seek offset won't change to the in-flight fetch request, it will take effect in next fetch request.
+     * So, the consumer might wait for {@code fetch.max.wait.ms} before starting to fetch the records from desired offset.
      *
+     * @param partition the TopicPartition on which the seek will be performed.
+     * @param offset the next offset returned by poll().
      * @throws IllegalArgumentException if the provided offset is negative
      * @throws IllegalStateException if the provided TopicPartition is not assigned to this consumer
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index 0fa37dc15d0..3e43f27ea2b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -28,7 +28,7 @@ import java.util.Objects;
  * <p>
  * If a valid partition number is specified that partition will be used when sending the record. If no partition is
  * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
- * present a partition will be assigned in a round-robin fashion.
+ * present a partition will be assigned in a round-robin fashion. Note that partition numbers are 0-indexed.
  * <p>
  * The record also has an associated timestamp. If the user did not provide a timestamp, the producer will stamp the
  * record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index bfc280a1678..740fd8a0780 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -672,7 +672,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testFetchInvalidOffset(): Unit = {
     this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
-    val consumer = createConsumer()
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
 
     // produce one record
     val totalRecords = 2
@@ -694,6 +694,53 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchOutOfRangeOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    // ensure no in-flight fetch request so that the offset can be reset immediately
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, startingOffset = 0)
+  }
+
+
+  @Test
+  def testFetchOutOfRangeOffsetResetConfigLatest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+    // ensure no in-flight fetch request so that the offset can be reset immediately
+    this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumer.seek(tp, 0)
+    // consume some, but not all of the records
+    consumeAndVerifyRecords(consumer = consumer, numRecords = totalRecords.toInt/2, startingOffset = 0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 17 // arbitrary, much higher offset
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the ending position
+    assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = totalRecords)
+    val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+    // ensure the seek went to the last known record at the time of the previous poll
+    assertEquals(totalRecords, nextRecord.offset())
+  }
+
   @Test
   def testFetchRecordLargerThanFetchMaxBytes(): Unit = {
     val maxFetchBytes = 10 * 1024
diff --git a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
index 7529bdd973b..d0c752e641d 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
@@ -40,7 +40,7 @@ public interface EventQueue extends AutoCloseable {
          *              its deadline before it could be scheduled.
          *              It will be a RejectedExecutionException if the event could not be
          *              scheduled because the event queue has already been closed.
-         *              Otherweise, it will be whatever exception was thrown by run().
+         *              Otherwise, it will be whatever exception was thrown by run().
          */
         default void handleException(Throwable e) {}
     }