You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "rreddy-22 (via GitHub)" <gi...@apache.org> on 2023/02/10 20:39:03 UTC

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13230: KAFKA-14704; Follower should truncate before incrementing high watermark

rreddy-22 commented on code in PR #13230:
URL: https://github.com/apache/kafka/pull/13230#discussion_r1103221020


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
-                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
-                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no exception during processPartitionData
-                        val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log of the replica
+                      // but we don't process the partition data in order to not update the
+                      // low/high watermarks until the truncation is actually done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new EpochEndOffset()

Review Comment:
   The method is called in line 457 -> truncateOnFetchResponse(divergingEndOffsets)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org