You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/06 01:01:10 UTC

[GitHub] [kafka] junrao commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

junrao commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r779191474



##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +397,109 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLogStartOffset - 1)
+          val highestOffsetInRemoteFromLeader = leaderLogStartOffset - 1
+          val targetEpoch: Int = {
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if(epochForLeaderLocalLogStartOffset == 0) {
+              epochForLeaderLocalLogStartOffset
+            } else {
+              // Fetch the earlier epoch/end-offset from the leader.
+              val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+              // Check if the target offset lies with in the range of earlier epoch
+              if (earlierEpochEndOffset.endOffset >= highestOffsetInRemoteFromLeader)
+                earlierEpochEndOffset.leaderEpoch() // This gives the respective leader epoch, will handle any gaps in epochs
+              else epochForLeaderLocalLogStartOffset
+            }
+          }
+
+          val rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, highestOffsetInRemoteFromLeader)

Review comment:
       TopicBasedRemoteLogMetadataManager independently updates the metadata state from the tier topic. When we make the `rlm.fetchRemoteLogSegmentMetadata` call, how does it make sure that it has caught up enough records from the tier topic including the segment covering the requested offset?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +397,109 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLogStartOffset - 1)
+          val highestOffsetInRemoteFromLeader = leaderLogStartOffset - 1
+          val targetEpoch: Int = {
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if(epochForLeaderLocalLogStartOffset == 0) {
+              epochForLeaderLocalLogStartOffset
+            } else {
+              // Fetch the earlier epoch/end-offset from the leader.
+              val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+              // Check if the target offset lies with in the range of earlier epoch
+              if (earlierEpochEndOffset.endOffset >= highestOffsetInRemoteFromLeader)
+                earlierEpochEndOffset.leaderEpoch() // This gives the respective leader epoch, will handle any gaps in epochs
+              else epochForLeaderLocalLogStartOffset
+            }
+          }
+
+          val rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, highestOffsetInRemoteFromLeader)
+
+          if (rlsMetadata.isPresent) {
+            val epochStream = rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.LEADER_EPOCH)
+            val epochs = readLeaderEpochCheckpoint(epochStream)
+
+            // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+            truncateFullyAndStartAt(partition, leaderLocalLogStartOffset)
+
+            log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
+            epochs.foreach { epochEntry =>
+              log.leaderEpochCache.map(cache => cache.assign(epochEntry.epoch, epochEntry.startOffset))
+            }

Review comment:
       Should we flush the leader epoch file at the end?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +396,76 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                leaderLogStartOffset: Long): Unit = {
+    replicaMgr.localLog(partition).foreach(log =>
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach(rlm => {
+          var rlsMetadata: Optional[RemoteLogSegmentMetadata] = Optional.empty()
+          val epoch = log.leaderEpochCache.flatMap(cache => cache.epochForOffset(leaderLocalLogStartOffset))
+          if (epoch.isDefined) {
+            rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, epoch.get, leaderLocalLogStartOffset)
+          } else {
+            // If epoch is not available, then it might be possible that this broker might lost its entire local storage.
+            // We may also have to build the leader epoch cache. To find out the remote log segment metadata for the
+            // leaderLocalLogStartOffset-1, start from the current leader epoch and subtract one to the epoch till
+            // finding the metadata.
+            var previousLeaderEpoch = currentLeaderEpoch
+            while (!rlsMetadata.isPresent && previousLeaderEpoch >= 0) {
+              rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, previousLeaderEpoch, leaderLocalLogStartOffset - 1)
+              previousLeaderEpoch -= 1
+            }
+          }
+          if (rlsMetadata.isPresent) {
+            val epochStream = rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.LEADER_EPOCH)
+            val epochs = readLeaderEpochCheckpoint(epochStream, log.dir)
+
+            // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+            truncateFullyAndStartAt(partition, leaderLocalLogStartOffset)
+
+            log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
+            epochs.foreach(epochEntry => {
+              log.leaderEpochCache.map(cache => cache.assign(epochEntry.epoch, epochEntry.startOffset))
+            })
+            info(s"Updated the epoch cache from remote tier till offset: $leaderLocalLogStartOffset " +
+              s"with size: ${epochs.size} for $partition")
+
+            // Restore producer snapshot
+            val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir, leaderLocalLogStartOffset)
+            Files.copy(rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),

Review comment:
       Should we write the producer snapshot to a temp file first and then rename?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +397,109 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLogStartOffset - 1)
+          val highestOffsetInRemoteFromLeader = leaderLogStartOffset - 1
+          val targetEpoch: Int = {
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if(epochForLeaderLocalLogStartOffset == 0) {

Review comment:
       space after if.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -715,6 +727,58 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
+  /**
+   * Handle a partition whose offset is out of range and return a new fetch offset.
+   */
+  protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
+    fetchOffsetAndApplyFun(topicPartition, topicId, currentLeaderEpoch,
+      (epoch, leaderLogStartOffset) => truncateFullyAndStartAt(topicPartition, leaderLogStartOffset))

Review comment:
       When we hit OffsetOutOfRangeException, it's possible that remote storage is enabled. In that case, we also need to rebuild the remote log metadata.

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +397,109 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLogStartOffset - 1)
+          val highestOffsetInRemoteFromLeader = leaderLogStartOffset - 1
+          val targetEpoch: Int = {
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if(epochForLeaderLocalLogStartOffset == 0) {
+              epochForLeaderLocalLogStartOffset
+            } else {
+              // Fetch the earlier epoch/end-offset from the leader.
+              val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+              // Check if the target offset lies with in the range of earlier epoch
+              if (earlierEpochEndOffset.endOffset >= highestOffsetInRemoteFromLeader)
+                earlierEpochEndOffset.leaderEpoch() // This gives the respective leader epoch, will handle any gaps in epochs
+              else epochForLeaderLocalLogStartOffset
+            }
+          }
+
+          val rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, highestOffsetInRemoteFromLeader)
+
+          if (rlsMetadata.isPresent) {
+            val epochStream = rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.LEADER_EPOCH)
+            val epochs = readLeaderEpochCheckpoint(epochStream)
+
+            // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+            truncateFullyAndStartAt(partition, leaderLocalLogStartOffset)
+
+            log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
+            epochs.foreach { epochEntry =>
+              log.leaderEpochCache.map(cache => cache.assign(epochEntry.epoch, epochEntry.startOffset))
+            }
+            info(s"Updated the epoch cache from remote tier till offset: $leaderLocalLogStartOffset " +
+              s"with size: ${epochs.size} for $partition")
+
+            // Restore producer snapshot
+            val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir, leaderLocalLogStartOffset)
+            Files.copy(rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+              snapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+            log.producerStateManager.reloadSnapshots()
+            log.loadProducerState(leaderLocalLogStartOffset, reloadFromCleanShutdown = false)
+            info(s"Built the leader epoch cache and producer snapshots from remote tier for $partition. " +

Review comment:
       It possible that the endOffset in rlsMetadata is larger than leaderLocalLogStartOffset. If we fetch the local log from leaderLocalLogStartOffset, duplicated tnx could be added to producerStateManager. It's better to start fetching from rlsMetadata.endOffset + 1.

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -224,8 +229,11 @@ class ReplicaFetcherThread(name: String,
     }
     val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse]
     if (!fetchSessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) {
-      // If we had a session topic ID related error, throw it, otherwise return an empty fetch data map.
-      if (fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
+      // If we had a session topic ID related error or unknown topic id or inconsistent topic id, throw it,
+      // otherwise return an empty fetch data map.
+      if (fetchResponse.error == Errors.UNKNOWN_TOPIC_ID ||
+        fetchResponse.error == Errors.FETCH_SESSION_TOPIC_ID_ERROR ||
+        fetchResponse.error == Errors.INCONSISTENT_TOPIC_ID) {

Review comment:
       This is the top level error. UNKNOWN_TOPIC_ID and INCONSISTENT_TOPIC_ID are partition level errors and don't need to be handled here.

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -715,6 +727,58 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
+  /**
+   * Handle a partition whose offset is out of range and return a new fetch offset.
+   */
+  protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {

Review comment:
       Why does this method need to be protected instead of private? Ditto for fetchOffsetAndBuildRemoteLogAuxState.

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +397,109 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLogStartOffset - 1)
+          val highestOffsetInRemoteFromLeader = leaderLogStartOffset - 1
+          val targetEpoch: Int = {
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if(epochForLeaderLocalLogStartOffset == 0) {
+              epochForLeaderLocalLogStartOffset
+            } else {
+              // Fetch the earlier epoch/end-offset from the leader.
+              val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+              // Check if the target offset lies with in the range of earlier epoch
+              if (earlierEpochEndOffset.endOffset >= highestOffsetInRemoteFromLeader)
+                earlierEpochEndOffset.leaderEpoch() // This gives the respective leader epoch, will handle any gaps in epochs
+              else epochForLeaderLocalLogStartOffset
+            }
+          }
+
+          val rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, highestOffsetInRemoteFromLeader)
+
+          if (rlsMetadata.isPresent) {
+            val epochStream = rlm.storageManager().fetchIndex(rlsMetadata.get(), RemoteStorageManager.IndexType.LEADER_EPOCH)
+            val epochs = readLeaderEpochCheckpoint(epochStream)
+
+            // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+            truncateFullyAndStartAt(partition, leaderLocalLogStartOffset)
+
+            log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
+            epochs.foreach { epochEntry =>
+              log.leaderEpochCache.map(cache => cache.assign(epochEntry.epoch, epochEntry.startOffset))

Review comment:
       Should we use foreach instead of map?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -386,11 +397,109 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Unit = {
+
+    def fetchEarlierEpochEndOffset(epoch:Int): EpochEndOffset = {
+        val previousEpoch = epoch - 1
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+          .setCurrentLeaderEpoch(currentLeaderEpoch)
+          .setLeaderEpoch(previousEpoch))
+        val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+        if (maybeEpochEndOffset.isEmpty) {
+          throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        val epochEndOffset = maybeEpochEndOffset.get
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+          throw Errors.forCode(epochEndOffset.errorCode()).exception()
+        }
+
+        epochEndOffset
+    }
+
+    replicaMgr.localLog(partition).foreach { log =>
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        replicaMgr.remoteLogManager.foreach { rlm =>
+
+          // Find the respective leader epoch for (leaderLogStartOffset - 1)
+          val highestOffsetInRemoteFromLeader = leaderLogStartOffset - 1
+          val targetEpoch: Int = {
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if(epochForLeaderLocalLogStartOffset == 0) {
+              epochForLeaderLocalLogStartOffset
+            } else {
+              // Fetch the earlier epoch/end-offset from the leader.
+              val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+              // Check if the target offset lies with in the range of earlier epoch
+              if (earlierEpochEndOffset.endOffset >= highestOffsetInRemoteFromLeader)

Review comment:
       endOffset is exclusive. So, we could just use leaderLogStartOffset.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org