You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/01 20:29:33 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r533648722



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -426,21 +451,34 @@ abstract class AbstractFetcherThread(name: String,
     warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+    if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) {
+      currentState
+    } else if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && initialFetchState.lastFetchedEpoch.nonEmpty &&
+              (currentState == null || currentState.state == Fetching)) {
+      PartitionFetchState(initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch,
+          state = Fetching, initialFetchState.lastFetchedEpoch)

Review comment:
       This check is a still a little hard to follow. I think we expect that if `initOffset` is negative, then `lastFetchedEpoch` will be empty and we will hit the `fetchOffsetAndTruncate` case below. Is that right? On the other hand, if `lastFetchedEpoch` is empty, then `initOffset` could still be non-negative if we have an old message format, which means we need to enter `Truncating` so that we can truncate to the high watermark. 
   
    One case that is not so clear is when `currentState` is non-null. Then we will enter the `Truncating` state below regardless whether `isTruncationOnFetchSupported` is set or not. Is that what we want?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -669,11 +714,18 @@ abstract class AbstractFetcherThread(name: String,
     Option(partitionStates.stateValue(topicPartition))
   }
 
+  /**
+   * Returns current fetch state for each partition assigned to this thread. This is used to reassign
+   * partitions when thread pool is resized. We return `lastFetchedEpoch=None` to ensure we go through

Review comment:
       This is probably ok. I guess an alternative would be to not take the initial last fetched epoch from `InitialFetchState`, but instead use `latestEpoch`.

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -102,6 +103,7 @@ class ReplicaFetcherThread(name: String,
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
   private val brokerSupportsLeaderEpochRequest = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
+  private val brokerSupportsTruncationOnFetch = ApiVersion.isTruncationOnFetchSupported(brokerConfig.interBrokerProtocolVersion)

Review comment:
       nit: I don't think we need this. We can override `isTruncationOnFetchSupported` with a `val`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org