You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/11 07:52:37 UTC

[GitHub] [kafka] showuon commented on a diff in pull request #12753: MINOR: Document Offset and Partition 0-indexing, fix typo

showuon commented on code in PR #12753:
URL: https://github.com/apache/kafka/pull/12753#discussion_r1019949776


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,76 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatest(): Unit = {
+
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)

Review Comment:
   Try adding one more custom config, you'll get what you expected:
   `this.consumerConfig.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0")`



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -694,6 +694,76 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
   }
 
+  @Test
+  def testFetchInvalidOffsetResetConfigEarliest(): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)
+    val totalRecords = 10L
+
+    val producer = createProducer()
+    val startingTimestamp = 0
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp)
+    consumer.assign(List(tp).asJava)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = totalRecords.toInt, startingOffset =0)
+    // seek to out of range position
+    val outOfRangePos = totalRecords + 1
+    consumer.seek(tp, outOfRangePos)
+    // assert that poll resets to the beginning position
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 1, startingOffset = 0)
+  }
+
+  @Test
+  def testFetchInvalidOffsetResetConfigLatest(): Unit = {
+
+    this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+    val consumer = createConsumer(configOverrides = this.consumerConfig)

Review Comment:
   Some explanation:
   To reduce consumer/follower fetch overhead, we will wait for a while in broker side if there's no records for a specific offset, to reduce the network traffic from client/broker. Ex: in your example, you send 10 records to broker, after received 10 records (offset 0 -9), the next consumer poll will try to fetch offset 10, but there's no offset 10 record in broker. So, when broker receiving the FETCH request, it'll wait in broker side for `fetch.max.wait.ms` (default 500 ms). If there's records arrived within 500ms, it'll respond to client. You can imagine, that reduce the network traffic a lot. And, if after 500ms, there's no record arrived, it'll respond with empty result. So, your current test will run like this:
   
   1. sending 10 records to broker
   2. consumer poll 10 records + one more fetch for offset 10
   3. consumer seek to 17
   4. consumer poll, but not sending FETCH request because there's already an in-flight FETCH request not responded, yet
   5. send another 10 records
   6. consumer poll, it'll get 10 records, but got discarded because fetcher found the starting offset is not what we expected, so send another FETCH staring from offset 17
   7. received records starting from offset 17
   
   If we set the  `fetch.max.wait.ms` to 0, it'll be like this:
   1. sending 10 records to broker
   2. consumer poll 10 records + one more fetch for offset 10, **and responded with empty results immediately**
   3. consumer seek to 17
   //4. consumer poll, but not sending FETCH request because there's already an in-flight FETCH request not responded, yet
   4. consumer poll, sending FETCH request, got out of range error, reset position to latest, 10
   5. send another 10 records
   //6. consumer poll, it'll get 10 records, but got discarded because fetcher found the starting offset is not what we expected, so send another FETCH staring from offset 17
   6. consumer poll, got 10 records
   7. received records starting from offset **10**
   
   Hope that's clear



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org