You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "dajac (via GitHub)" <gi...@apache.org> on 2023/02/10 08:31:20 UTC

[GitHub] [kafka] dajac opened a new pull request, #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

dajac opened a new pull request, #13230:
URL: https://github.com/apache/kafka/pull/13230

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "rreddy-22 (via GitHub)" <gi...@apache.org>.
rreddy-22 commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103221020


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()

Review Comment:
   The method is called in line 457/458 -> truncateOnFetchResponse(divergingEndOffsets)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103127795


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1089,6 +1089,85 @@ class AbstractFetcherThreadTest {
     fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testTruncateOnFetchDoesNotUpdateHighWatermark(): Unit = {
+    assumeTrue(truncateOnFetch)
+
+    val partition = new TopicPartition("topic", 0)
+
+    var truncateCalls = 0
+    var processPartitionDataCalls = 0
+    val fetcher = new MockFetcherThread(new MockLeaderEndPoint) {
+      override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = {
+        processPartitionDataCalls += 1
+        super.processPartitionData(topicPartition, fetchOffset, partitionData)
+      }
+
+      override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
+        truncateCalls += 1
+        super.truncate(topicPartition, truncationState)
+      }
+    }
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 3, leaderEpoch = 4, new SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 4, leaderEpoch = 4, new SimpleRecord("e".getBytes)),
+      mkBatch(baseOffset = 5, leaderEpoch = 4, new SimpleRecord("f".getBytes)),
+    )
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 1L)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+    assertEquals(6L, replicaState.logEndOffset)
+    fetcher.verifyLastFetchedEpoch(partition, expectedEpoch = Some(4))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 3, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+      mkBatch(baseOffset = 4, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+    )
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 4L)
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    fetcher.verifyLastFetchedEpoch(partition, Some(2))
+
+    // The log end offset is truncated.
+    assertEquals(3L, replicaState.logEndOffset)
+
+    // The high watermark is not updated.
+    assertEquals(1L, replicaState.highWatermark)
+
+    // The first fetch should result in truncating the follower's log and

Review Comment:
   The "first fetch" mentioned here is the fetcher.doWork() on line 1139. Would it make sense to but the comment there? Since it applies also to the checks on 1143 and 1146.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "rreddy-22 (via GitHub)" <gi...@apache.org>.
rreddy-22 commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103225207


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()

Review Comment:
   This eventually goes the the truncate function in ReplicaFetcherThread where our meter is marked and warning "Truncating below high watermark " is seen.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103129876


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1387,13 +1466,8 @@ class AbstractFetcherThreadTest {
       val state = replicaPartitionState(topicPartition)
 
       if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
-        val divergingEpoch = partitionData.divergingEpoch
-        truncateOnFetchResponse(Map(topicPartition -> new EpochEndOffset()
-          .setPartition(topicPartition.partition)
-          .setErrorCode(Errors.NONE.code)
-          .setLeaderEpoch(divergingEpoch.epoch)
-          .setEndOffset(divergingEpoch.endOffset)))
-        return None
+        throw new IllegalStateException("processPartitionData should not be called for a partition with " +

Review Comment:
   Is this called anywhere in tests where we would throw the error? If not, it's interesting that this was implemented and never used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "rreddy-22 (via GitHub)" <gi...@apache.org>.
rreddy-22 commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103225207


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()

Review Comment:
   This eventually goes the the truncate function in ReplicaFetcherThread.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1104324436


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1089,6 +1089,85 @@ class AbstractFetcherThreadTest {
     fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testTruncateOnFetchDoesNotUpdateHighWatermark(): Unit = {
+    assumeTrue(truncateOnFetch)
+
+    val partition = new TopicPartition("topic", 0)
+
+    var truncateCalls = 0
+    var processPartitionDataCalls = 0
+    val fetcher = new MockFetcherThread(new MockLeaderEndPoint) {
+      override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = {
+        processPartitionDataCalls += 1
+        super.processPartitionData(topicPartition, fetchOffset, partitionData)
+      }
+
+      override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
+        truncateCalls += 1
+        super.truncate(topicPartition, truncationState)
+      }
+    }
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 3, leaderEpoch = 4, new SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 4, leaderEpoch = 4, new SimpleRecord("e".getBytes)),
+      mkBatch(baseOffset = 5, leaderEpoch = 4, new SimpleRecord("f".getBytes)),
+    )
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 1L)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+    assertEquals(6L, replicaState.logEndOffset)
+    fetcher.verifyLastFetchedEpoch(partition, expectedEpoch = Some(4))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 3, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+      mkBatch(baseOffset = 4, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+    )
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 4L)
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    fetcher.doWork()
+    fetcher.verifyLastFetchedEpoch(partition, Some(2))
+
+    // The log end offset is truncated.
+    assertEquals(3L, replicaState.logEndOffset)
+
+    // The high watermark is not updated.
+    assertEquals(1L, replicaState.highWatermark)
+
+    // The first fetch should result in truncating the follower's log and

Review Comment:
   Ack. Changed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1104725297


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()

Review Comment:
   I am for not processing partition data when there is a diverging epoch. That is safer in my opinion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1104325598


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()

Review Comment:
   The batching adds little value in my opinion. However, my intent was really not to process partition data at all when truncating but it seems not safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103436599


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()

Review Comment:
   I was wondering about that too. We seem to batch together the truncating partitions, but I'm not sure there is any benefit.  That said, it seems a little safer not to assume that `processPartitionData` will still be valid in the presence of divergence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13230:
URL: https://github.com/apache/kafka/pull/13230#issuecomment-1429361939

   Merged to trunk, 3.4 and 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "rreddy-22 (via GitHub)" <gi...@apache.org>.
rreddy-22 commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103221020


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()

Review Comment:
   The method is called in line 457 -> truncateOnFetchResponse(divergingEndOffsets)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1104324176


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1387,13 +1466,8 @@ class AbstractFetcherThreadTest {
       val state = replicaPartitionState(topicPartition)
 
       if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
-        val divergingEpoch = partitionData.divergingEpoch
-        truncateOnFetchResponse(Map(topicPartition -> new EpochEndOffset()
-          .setPartition(topicPartition.partition)
-          .setErrorCode(Errors.NONE.code)
-          .setLeaderEpoch(divergingEpoch.epoch)
-          .setEndOffset(divergingEpoch.endOffset)))
-        return None
+        throw new IllegalStateException("processPartitionData should not be called for a partition with " +

Review Comment:
   Yeah, this was wrongly implemented. With this, all tests were truncating twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1104723201


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()

Review Comment:
   So are we deciding for now just not to process partition data? Or move the truncation into this handling?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13230:
URL: https://github.com/apache/kafka/pull/13230


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103426128


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1089,6 +1089,85 @@ class AbstractFetcherThreadTest {
     fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testTruncateOnFetchDoesNotUpdateHighWatermark(): Unit = {

Review Comment:
   nit: wonder if we should change the name. The test is not really doing anything with the high watermark; it is just verifying that `processPartitionData` is not called. Maybe a test like this could exist for `ReplicaFetcherThread`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103074467


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()

Review Comment:
   Just for my understanding -- the truncation is done elsewhere right? In theory if it was done here, we could process the partition data after?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org