You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "rittikaadhikari (via GitHub)" <gi...@apache.org> on 2023/02/07 06:28:54 UTC

[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

rittikaadhikari commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1097985124


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.

Review Comment:
   nit: Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.OFFSET_OUT_OF_RANGE =>
                   if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
                     partitionsWithError += topicPartition
-                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
-                  debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
-                    s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition")
-                  if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState,
-                    fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset()))
-                    partitionsWithError += topicPartition
+

Review Comment:
   nit: extra line



##########
core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+
+import java.util.Optional;
+
+public class ReplicaAlterLogDirsTierStateMachine implements TierStateMachine {

Review Comment:
   nit: add Java Docs



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -785,17 +732,19 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitonData the fetch request data for this topic partition
    */
   private def handleOffsetsMovedToTieredStorage(topicPartition: TopicPartition,
                                                 fetchState: PartitionFetchState,
-                                                leaderEpochInRequest: Optional[Integer],
-                                                leaderLogStartOffset: Long): Boolean = {
+                                                fetchPartitonData: PartitionData): Boolean = {
+    val leaderEpochInRequest = fetchPartitonData.currentLeaderEpoch
     try {
-      val newFetchState = fetchOffsetAndApplyTruncateAndBuild(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch,
-        (offsetEpoch, leaderLocalLogStartOffset) => buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset))
+      val newFetchState = fetchTierStateMachine.start(topicPartition, fetchState, fetchPartitonData);
+
+      // No-op for now
+      // newFetchState = fetchTierStateMachine.maybeAdvanceState(topicPartition, newFetchState).get()

Review Comment:
   maybe mark this as a TODO instead? 
   i.e., 
   ```
   // TODO: use fetchTierStateMachine.maybeAdvanceState when implementing async tiering logic in KAFKA-13560
   ```



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -785,17 +732,19 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitonData the fetch request data for this topic partition

Review Comment:
   nit: fetchPartitionData



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,

Review Comment:
   nit: Add java doc for this function



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -821,8 +868,9 @@ class AbstractFetcherThreadTest {
   def testCorruptMessage(): Unit = {
     val partition = new TopicPartition("topic", 0)
 
-    val fetcher = new MockFetcherThread(new MockLeaderEndPoint {
+    val mockLeaderEndPoint = new MockLeaderEndPoint {
       var fetchedOnce = false
+

Review Comment:
   nit: extra new line



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderStartOffset._1;
+        long leaderStartOffset = (long) epochAndLeaderStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<EpochEntry>("", bufferedReader, 0, LeaderEpochCheckpointFile.Formatter$.MODULE$);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */
+    protected Long buildRemoteLogAuxState(TopicPartition topicPartition,
+                                        Integer currentLeaderEpoch,
+                                        Long leaderLocalLogStartOffset,
+                                        Integer epochForLeaderLocalLogStartOffset,
+                                        Long leaderLogStartOffset) throws IOException, RemoteStorageException {
+
+        UnifiedLog log = replicaMgr.localLogOrException(topicPartition);
+
+        long nextOffset;
+
+        if (log.remoteStorageSystemEnable() && log.config().remoteLogConfig.remoteStorageEnable) {
+            if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated");
+
+            RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+            // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+            // until that offset
+            long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
+            int targetEpoch;
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if (epochForLeaderLocalLogStartOffset == 0) {
+                targetEpoch = epochForLeaderLocalLogStartOffset;
+            } else {
+                // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+                EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
+                // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+                if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) {
+                    // Always use the leader epoch from returned earlierEpochEndOffset.
+                    // This gives the respective leader epoch, that will handle any gaps in epochs.
+                    // For ex, leader epoch cache contains:
+                    // leader-epoch   start-offset
+                    //  0               20
+                    //  1               85
+                    //  <2> - gap no messages were appended in this leader epoch.
+                    //  3               90
+                    //  4               98
+                    // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+                    // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+                    // So, for offset 89, we should return leader epoch as 1 like below.
+                    targetEpoch = earlierEpochEndOffset.leaderEpoch();
+                } else
+                    targetEpoch = epochForLeaderLocalLogStartOffset;
+            }
+
+            Optional<RemoteLogSegmentMetadata> maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset);
+
+            if (maybeRlsm.isPresent()) {
+                RemoteLogSegmentMetadata remoteLogSegmentMetadata = maybeRlsm.get();
+                // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start
+                // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+                // Assign nextOffset with the offset from which next fetch should happen.
+                nextOffset = remoteLogSegmentMetadata.endOffset() + 1;
+
+                // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+                Partition partition = replicaMgr.getPartitionOrException(topicPartition);
+                partition.truncateFullyAndStartAt(nextOffset, false);
+
+                // Build leader epoch cache.
+                log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented$.MODULE$);
+                Seq<EpochEntry> epochs = JavaConverters.asScala(readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata)).toSeq();
+                if (log.leaderEpochCache().isDefined()) {
+                    log.leaderEpochCache().get().assign(epochs);
+                }
+
+                LOGGER.debug("Updated the epoch cache from remote tier till offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), partition);
+
+                // Restore producer snapshot
+                File snapshotFile = UnifiedLog.producerSnapshotFile(log.dir(), nextOffset);
+                buildProducerSnapshotFile(snapshotFile, remoteLogSegmentMetadata, rlm);
+
+                // Reload producer snapshots.
+                log.producerStateManager().truncateFullyAndReloadSnapshots();
+                log.loadProducerState(nextOffset);
+                LOGGER.debug("Built the leader epoch cache and producer snapshots from remote tier for {}, " +
+                                "with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}",
+                        partition, log.producerStateManager().activeProducers().size(), leaderLogStartOffset, nextOffset);
+            } else {
+                throw new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition +
+                        ", currentLeaderEpoch: " + currentLeaderEpoch +
+                        ", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset +
+                        ", leaderLogStartOffset: " + leaderLogStartOffset +
+                        ", epoch: " + targetEpoch +
+                        "as the previous remote log segment metadata was not found");
+            }
+        } else {
+            // If the tiered storage is not enabled throw an exception back so tht it will retry until the tiered storage

Review Comment:
   nit: that



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderStartOffset._1;
+        long leaderStartOffset = (long) epochAndLeaderStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<EpochEntry>("", bufferedReader, 0, LeaderEpochCheckpointFile.Formatter$.MODULE$);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */
+    protected Long buildRemoteLogAuxState(TopicPartition topicPartition,
+                                        Integer currentLeaderEpoch,
+                                        Long leaderLocalLogStartOffset,
+                                        Integer epochForLeaderLocalLogStartOffset,
+                                        Long leaderLogStartOffset) throws IOException, RemoteStorageException {
+
+        UnifiedLog log = replicaMgr.localLogOrException(topicPartition);
+
+        long nextOffset;
+
+        if (log.remoteStorageSystemEnable() && log.config().remoteLogConfig.remoteStorageEnable) {
+            if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated");
+
+            RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+            // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+            // until that offset
+            long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
+            int targetEpoch;
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if (epochForLeaderLocalLogStartOffset == 0) {
+                targetEpoch = epochForLeaderLocalLogStartOffset;
+            } else {
+                // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+                EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
+                // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+                if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) {
+                    // Always use the leader epoch from returned earlierEpochEndOffset.
+                    // This gives the respective leader epoch, that will handle any gaps in epochs.
+                    // For ex, leader epoch cache contains:
+                    // leader-epoch   start-offset
+                    //  0               20
+                    //  1               85
+                    //  <2> - gap no messages were appended in this leader epoch.
+                    //  3               90
+                    //  4               98
+                    // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+                    // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+                    // So, for offset 89, we should return leader epoch as 1 like below.
+                    targetEpoch = earlierEpochEndOffset.leaderEpoch();
+                } else

Review Comment:
   nit: add {} to else statement



##########
core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+
+import java.util.Optional;
+
+public class ReplicaAlterLogDirsTierStateMachine implements TierStateMachine {
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     FetchRequest.PartitionData fetchPartitionData) throws Exception {
+        // JBOD is not supported with tiered storage.
+        throw new UnsupportedOperationException("Building remote log aux state not supported in ReplicaAlterLogDirsThread.");

Review Comment:
   nit: Building remote log aux state is not supported in ReplicaAlterLogDirsThread.



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderStartOffset._1;
+        long leaderStartOffset = (long) epochAndLeaderStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,

Review Comment:
   Add Java doc for this function



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -785,17 +732,19 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitonData the fetch request data for this topic partition
    */
   private def handleOffsetsMovedToTieredStorage(topicPartition: TopicPartition,
                                                 fetchState: PartitionFetchState,
-                                                leaderEpochInRequest: Optional[Integer],
-                                                leaderLogStartOffset: Long): Boolean = {
+                                                fetchPartitonData: PartitionData): Boolean = {

Review Comment:
   nit: fetchPartitionData



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderStartOffset._1;
+        long leaderStartOffset = (long) epochAndLeaderStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<EpochEntry>("", bufferedReader, 0, LeaderEpochCheckpointFile.Formatter$.MODULE$);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */
+    protected Long buildRemoteLogAuxState(TopicPartition topicPartition,

Review Comment:
   Does this API still need to be protected?



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1334,6 +1399,33 @@ class AbstractFetcherThreadTest {
     }
   }
 
+  class MockTierStateMachine(leader: LeaderEndPoint,

Review Comment:
   can set a default value for `fetchBackOffMs` to 0



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -779,14 +823,17 @@ class AbstractFetcherThreadTest {
   @Test
   def testRetryAfterUnknownLeaderEpochInLatestOffsetFetch(): Unit = {
     val partition = new TopicPartition("topic", 0)
-    val fetcher: MockFetcherThread = new MockFetcherThread(new MockLeaderEndPoint {
+    val mockLeaderEndPoint = new MockLeaderEndPoint {
       val tries = new AtomicInteger(0)
+

Review Comment:
   nit: extra new line



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -664,16 +705,14 @@ class AbstractFetcherThreadTest {
   def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = {
     val partition = new TopicPartition("topic", 0)
     var isErrorHandled = false
-    val fetcher = new MockFetcherThread(new MockLeaderEndPoint) {
-      override protected def buildRemoteLogAuxState(partition: TopicPartition,
-                                                    currentLeaderEpoch: Int,
-                                                    fetchOffset: Long,
-                                                    epochForFetchOffset: Int,
-                                                    leaderLogStartOffset: Long): Long = {
+    val mockLeaderEndpoint = new MockLeaderEndPoint
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint, 0) {
+      override def start(topicPartition: TopicPartition, currentFetchState: PartitionFetchState, fetchPartitionData: FetchRequest.PartitionData): PartitionFetchState = {
         isErrorHandled = true
-        throw new FencedLeaderEpochException(s"Epoch $currentLeaderEpoch is fenced")
+        throw new FencedLeaderEpochException(s"Epoch ${currentFetchState.currentLeaderEpoch} if fenced")

Review Comment:
   nit: is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org