You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/12/16 09:13:05 UTC
[kafka] branch trunk updated: MINOR: Document Offset and Partition 0-indexing, fix typo (#12753)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e3585a4cd5d MINOR: Document Offset and Partition 0-indexing, fix typo (#12753)
e3585a4cd5d is described below
commit e3585a4cd5ddb5b8475a49c38143d18e7c640bfe
Author: Daniel Scanteianu <da...@gmail.com>
AuthorDate: Fri Dec 16 09:12:40 2022 +0000
MINOR: Document Offset and Partition 0-indexing, fix typo (#12753)
Add comments to clarify that both offsets and partitions are 0-indexed, and fix a minor typo. Clarify which offset will be retrieved by poll() after seek() is used in various circumstances. Also added integration tests.
Reviewers: Luke Chen <sh...@gmail.com>
---
.../kafka/clients/consumer/KafkaConsumer.java | 21 ++++++++++
.../kafka/clients/producer/ProducerRecord.java | 2 +-
.../kafka/api/PlaintextConsumerTest.scala | 49 +++++++++++++++++++++-
.../java/org/apache/kafka/queue/EventQueue.java | 2 +-
4 files changed, 71 insertions(+), 3 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index cf85798f82b..1d756d1e64c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1581,7 +1581,28 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
* is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
* you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+ * <p>
+ * The next Consumer Record which will be retrieved when poll() is invoked will have the offset specified, given that
+ * a record with that offset exists (i.e. it is a valid offset).
+ * <p>
+ * {@link #seekToBeginning(Collection)} will go to the first offset in the topic.
+ * seek(0) is equivalent to seekToBeginning for a TopicPartition with beginning offset 0,
+ * assuming that there is a record at offset 0 still available.
+ * {@link #seekToEnd(Collection)} is equivalent to seeking to the last offset of the partition, but behavior depends on
+ * {@code isolation.level}, so see {@link #seekToEnd(Collection)} documentation for more details.
+ * <p>
+ * Seeking to the offset smaller than the log start offset or larger than the log end offset
+ * means an invalid offset is reached.
+ * Invalid offset behaviour is controlled by the {@code auto.offset.reset} property.
+ * If this is set to "earliest", the next poll will return records from the starting offset.
+ * If it is set to "latest", it will seek to the last offset (similar to seekToEnd()).
+ * If it is set to "none", an {@code OffsetOutOfRangeException} will be thrown.
+ * <p>
+ * Note that, the seek offset won't change to the in-flight fetch request, it will take effect in next fetch request.
+ * So, the consumer might wait for {@code fetch.max.wait.ms} before starting to fetch the records from desired offset.
*
+ * @param partition the TopicPartition on which the seek will be performed.
+ * @param offset the next offset returned by poll().
* @throws IllegalArgumentException if the provided offset is negative
* @throws IllegalStateException if the provided TopicPartition is not assigned to this consumer
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index 0fa37dc15d0..3e43f27ea2b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -28,7 +28,7 @@ import java.util.Objects;
* <p>
* If a valid partition number is specified that partition will be used when sending the record. If no partition is
* specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
- * present a partition will be assigned in a round-robin fashion.
+ * present a partition will be assigned in a round-robin fashion. Note that partition numbers are 0-indexed.
* <p>
* The record also has an associated timestamp. If the user did not provide a timestamp, the producer will stamp the
* record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index bfc280a1678..740fd8a0780 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -672,7 +672,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
@Test
def testFetchInvalidOffset(): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
- val consumer = createConsumer()
+ val consumer = createConsumer(configOverrides = this.consumerConfig)
// produce one record
val totalRecords = 2
@@ -694,6 +694,53 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
}
+ @Test
+ def testFetchOutOfRangeOffsetResetConfigEarliest(): Unit = {
+ this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ // ensure no in-flight fetch request so that the offset can be reset immediately
+ this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")
+ val consumer = createConsumer(configOverrides = this.consumerConfig)
+ val totalRecords = 10L
+
+ val producer = createProducer()
+ val startingTimestamp = 0
+ sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp)
+ consumer.assign(List(tp).asJava)
+ consumeAndVerifyRecords(consumer = consumer, numRecords = totalRecords.toInt, startingOffset =0)
+ // seek to out of range position
+ val outOfRangePos = totalRecords + 1
+ consumer.seek(tp, outOfRangePos)
+ // assert that poll resets to the beginning position
+ consumeAndVerifyRecords(consumer = consumer, numRecords = 1, startingOffset = 0)
+ }
+
+
+ @Test
+ def testFetchOutOfRangeOffsetResetConfigLatest(): Unit = {
+ this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+ // ensure no in-flight fetch request so that the offset can be reset immediately
+ this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")
+ val consumer = createConsumer(configOverrides = this.consumerConfig)
+ val totalRecords = 10L
+
+ val producer = createProducer()
+ val startingTimestamp = 0
+ sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp)
+ consumer.assign(List(tp).asJava)
+ consumer.seek(tp, 0)
+ // consume some, but not all of the records
+ consumeAndVerifyRecords(consumer = consumer, numRecords = totalRecords.toInt/2, startingOffset = 0)
+ // seek to out of range position
+ val outOfRangePos = totalRecords + 17 // arbitrary, much higher offset
+ consumer.seek(tp, outOfRangePos)
+ // assert that poll resets to the ending position
+ assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty)
+ sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = totalRecords)
+ val nextRecord = consumer.poll(Duration.ofMillis(50)).iterator().next()
+ // ensure the seek went to the last known record at the time of the previous poll
+ assertEquals(totalRecords, nextRecord.offset())
+ }
+
@Test
def testFetchRecordLargerThanFetchMaxBytes(): Unit = {
val maxFetchBytes = 10 * 1024
diff --git a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
index 7529bdd973b..d0c752e641d 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/EventQueue.java
@@ -40,7 +40,7 @@ public interface EventQueue extends AutoCloseable {
* its deadline before it could be scheduled.
* It will be a RejectedExecutionException if the event could not be
* scheduled because the event queue has already been closed.
- * Otherweise, it will be whatever exception was thrown by run().
+ * Otherwise, it will be whatever exception was thrown by run().
*/
default void handleException(Throwable e) {}
}