You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mattwong949 (via GitHub)" <gi...@apache.org> on 2023/02/06 22:08:47 UTC

[GitHub] [kafka] mattwong949 opened a new pull request, #13206: Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

mattwong949 opened a new pull request, #13206:
URL: https://github.com/apache/kafka/pull/13206

   This PR adds the TierStateMachine interface to handle all state transitions related to tiered storage and building the remote log aux state.
   
   The new interface supports a `start` and `maybeAdvanceState`. In the `ReplicaFetcherTierStateMachine`, the `maybeAdvanceState` is unused since the implementation is synchronous. Only the `start` is needed. This PR keeps the addition of the `maybeAdvanceState` because there is an existing task for building the remote log aux state in an asynchronous manner that will be able to use the full interface https://issues.apache.org/jira/browse/KAFKA-13560


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100057051


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+    }
+
+
+    /**
+     * Start the tier state machine for the provided topic partition. Currently, this start method will build the
+     * entire remote aux log state synchronously.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error
+     *
+     * @return the new PartitionFetchState after the successful start of the
+     *         tier state machine
+     */
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderLocalStartOffset._1;
+        long leaderLocalStartOffset = (long) epochAndLeaderLocalStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    /**
+     * This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     *
+     * @return the original PartitionFetchState
+     */
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */
+    private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+                                        Integer currentLeaderEpoch,
+                                        Long leaderLocalLogStartOffset,
+                                        Integer epochForLeaderLocalLogStartOffset,
+                                        Long leaderLogStartOffset) throws IOException, RemoteStorageException {
+
+        UnifiedLog log = replicaMgr.localLogOrException(topicPartition);
+
+        long nextOffset;
+
+        if (log.remoteStorageSystemEnable() && log.config().remoteLogConfig.remoteStorageEnable) {
+            if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated");
+
+            RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+            // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+            // until that offset
+            long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
+            int targetEpoch;
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if (epochForLeaderLocalLogStartOffset == 0) {
+                targetEpoch = epochForLeaderLocalLogStartOffset;
+            } else {
+                // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+                EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
+                // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+                if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) {
+                    // Always use the leader epoch from returned earlierEpochEndOffset.
+                    // This gives the respective leader epoch, that will handle any gaps in epochs.
+                    // For ex, leader epoch cache contains:
+                    // leader-epoch   start-offset
+                    //  0               20
+                    //  1               85
+                    //  <2> - gap no messages were appended in this leader epoch.
+                    //  3               90
+                    //  4               98
+                    // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+                    // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+                    // So, for offset 89, we should return leader epoch as 1 like below.
+                    targetEpoch = earlierEpochEndOffset.leaderEpoch();
+                } else {
+                    targetEpoch = epochForLeaderLocalLogStartOffset;
+                }
+            }
+
+            Optional<RemoteLogSegmentMetadata> maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset);
+
+            if (maybeRlsm.isPresent()) {

Review Comment:
   Note: if the rlmMetadata is unavailable for an extended period of time, the replica fetcher will keep retrying indefinitely to construct the starting fetch state for the partition. This will lead to an `OffsetForLeaderEpoch` and `ListOffsets` requests every time. If a large number of partitions are impacted, that will generate unnecessary inter-broker traffic on the cluster - although marginal most of the time. As an optimization, we could store the leader epoch associated to the leader's local log start offset - 1 which was retrieved here (we would still, however, need to query for the local log start offset on the leader on every iteration, and fetch the associated leader epoch if it has changed).
   
   The asynchronous resolution of the correct fetch state from the remote storages (KAFKA-13560) will prevent the extra load on the replica fetcher thread itself. The consideration above applies to the RPCs which are made on the synchronous fetch path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1113664534


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
        * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
        * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
        *
-       * In the first case, the follower's current log end offset is smaller than the leader's log start offset
-       * (or leader's local log start offset).
-       * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current
-       * leader's log start offset(or leader's local log start offset).
+       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
+       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
+       * start offset.
        * In the second case, the follower should just keep the current log segments and retry the fetch. In the second
        * case, there will be some inconsistency of data between old and new leader. We are not solving it here.
        * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both
        * brokers and producers.
        *
        * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
-       * and the current leader's (local-log-start-offset or) log start offset.
+       * and the current leader's log start offset.
        */
-      val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
-        leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else
-        leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
-
+      val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)

Review Comment:
   +1 to @Hangleton 's comment. I restored this function back to the original logic, so this function is only called when first trying to fetch the leader log start offset (after starting fetch or after getting the offset out of range error). If the follower gets the `OFFSET_MOVED_TO_TIERED_STORAGE` error, it proceeds to other the code path to build the remote aux log state via TierStateMachine interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   the returned value indicates whether the handler method returned with or without error. It's mentioned in the documentation description following convention of the other handler functions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   the returned value indicates whether the handler method returned with or without error. I'll update the documentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1099635815


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.OFFSET_OUT_OF_RANGE =>
                   if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
                     partitionsWithError += topicPartition
-                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
-                  debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
-                    s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition")
-                  if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState,
-                    fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset()))
-                    partitionsWithError += topicPartition
+

Review Comment:
   this seems to be the white space convention in this part of the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1099641934


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;

Review Comment:
   makes sense, I can remove it to clean up the implementation for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116390136


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -615,14 +649,29 @@ class AbstractFetcherThreadTest {
   @Test
   def testFollowerFetchMovedToTieredStore(): Unit = {
     val partition = new TopicPartition("topic", 0)
-    val fetcher = new MockFetcherThread(new MockLeaderEndPoint)
 
     val replicaLog = Seq(
       mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
       mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
       mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
 
     val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 0L, rlmEnabled = true)
+
+    val mockLeaderEndpoint = new MockLeaderEndPoint
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) {
+      // override the start() of MockTierStateMachine to mimic truncateFullyAndStartAt and update the replicaState in the MockFetcherThread
+      override def start(topicPartition: TopicPartition,
+                         currentFetchState: PartitionFetchState,
+                         fetchPartitionData: FetchResponseData.PartitionData): PartitionFetchState = {
+        replicaState.log.clear()
+        replicaState.localLogStartOffset = 5
+        replicaState.logEndOffset = 5

Review Comment:
   hm yeah I see what you are saying, I can make this change. the only somewhat tricky part is that we pass in the MockTierStateMachine to the fetcher itself, so we'd have to pass the reference to the fetcher to the MockTierStateMachine after both are instantiated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao merged pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao merged PR #13206:
URL: https://github.com/apache/kafka/pull/13206


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1102707234


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
        * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
        * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
        *
-       * In the first case, the follower's current log end offset is smaller than the leader's log start offset
-       * (or leader's local log start offset).
-       * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current
-       * leader's log start offset(or leader's local log start offset).
+       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
+       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
+       * start offset.
        * In the second case, the follower should just keep the current log segments and retry the fetch. In the second
        * case, there will be some inconsistency of data between old and new leader. We are not solving it here.
        * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both
        * brokers and producers.
        *
        * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
-       * and the current leader's (local-log-start-offset or) log start offset.
+       * and the current leader's log start offset.
        */
-      val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
-        leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else
-        leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
-
+      val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)

Review Comment:
   IIUC, this is correct. We should try to fetch from the leader log start offset instead of the local leader log start offset so that in the case where the leader log start offset < local leader log start offset, the leader returns an offset-move-to-tiered-storage error and the follower takes the related code path to reconstruct the local replica log prefix.



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
        * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
        * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
        *
-       * In the first case, the follower's current log end offset is smaller than the leader's log start offset
-       * (or leader's local log start offset).
-       * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current
-       * leader's log start offset(or leader's local log start offset).
+       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
+       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
+       * start offset.

Review Comment:
   I think this case refers to line 674. This comment reverts back to the original one, before the change introduced for TS. Agreed it could be made clearer though, perhaps by referencing explicitly the case below. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116326600


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -633,13 +669,18 @@ class AbstractFetcherThreadTest {
       mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
       mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
 
-
     val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 8L, rlmEnabled = true)
     // Overriding the log start offset to zero for mocking the scenario of segment 0-4 moved to remote store.
     leaderState.logStartOffset = 0
     fetcher.mockLeader.setLeaderState(partition, leaderState)
     fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
 
+    def buildRemoteLog(topicPartition: TopicPartition, leaderLogStartOffset: Long): Unit = {
+      fetcher.truncateFullyAndStartAt(topicPartition, leaderState.localLogStartOffset)
+      replicaState.logStartOffset = leaderLogStartOffset

Review Comment:
   hm ok I think I see what you're suggesting. Let me try the refactor w/o the callback. thanks Jun



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116323021


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -27,9 +27,11 @@ import kafka.utils.{DelayedItem, Logging, Pool}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.message.FetchResponseData.PartitionData
 import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
+//import org.apache.kafka.common.requests.FetchRequest.PartitionData

Review Comment:
   yup I think I did this from Rittika's comment in an earlier commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100799705


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;

Review Comment:
   sorry, thought I had it removed in my previous commits, but I guess not. I'll pick it up in the next commits



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "rittikaadhikari (via GitHub)" <gi...@apache.org>.
rittikaadhikari commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116201462


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -26,10 +26,11 @@ import kafka.utils.{DelayedItem, Logging, Pool}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.message.FetchResponseData.PartitionData
 import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
-import org.apache.kafka.common.requests.FetchRequest.PartitionData
+//import org.apache.kafka.common.requests.FetchRequest.PartitionData

Review Comment:
   nit: remove commented import



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1113713186


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -633,13 +669,18 @@ class AbstractFetcherThreadTest {
       mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
       mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
 
-
     val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 8L, rlmEnabled = true)
     // Overriding the log start offset to zero for mocking the scenario of segment 0-4 moved to remote store.
     leaderState.logStartOffset = 0
     fetcher.mockLeader.setLeaderState(partition, leaderState)
     fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
 
+    def buildRemoteLog(topicPartition: TopicPartition, leaderLogStartOffset: Long): Unit = {
+      fetcher.truncateFullyAndStartAt(topicPartition, leaderState.localLogStartOffset)
+      replicaState.logStartOffset = leaderLogStartOffset

Review Comment:
   I had tried to ensure that the MockTierStateMachine code would get invoked as part of the test as a sanity check that the TierStateMachine logic is getting called from handleOffsetsMovedToTieredStorage. I couldn't think of another way to get the replicaPartitionState updated from the MockTierStateMachine since it is contained in the MockFetcherThread.
   
   I am not sure about the effectiveness of the test if we override the `doWork` from the MockFetcherThread to update the replicaPartitionState given my thoughts on trying to invoke the MockTierStateMachine code, but maybe I'm misunderstanding the alternative you mentioned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1113664534


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
        * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
        * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
        *
-       * In the first case, the follower's current log end offset is smaller than the leader's log start offset
-       * (or leader's local log start offset).
-       * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current
-       * leader's log start offset(or leader's local log start offset).
+       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
+       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
+       * start offset.
        * In the second case, the follower should just keep the current log segments and retry the fetch. In the second
        * case, there will be some inconsistency of data between old and new leader. We are not solving it here.
        * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both
        * brokers and producers.
        *
        * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
-       * and the current leader's (local-log-start-offset or) log start offset.
+       * and the current leader's log start offset.
        */
-      val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
-        leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else
-        leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
-
+      val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)

Review Comment:
   +1 to @Hangleton 's comment. I restored this function back to the original logic, so this function is only called when first trying to fetch the leader log start offset. If the follower gets the `OFFSET_MOVED_TO_TIERED_STORAGE` error, it proceeds to the code path to build the remote aux log state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1117432801


##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;

Review Comment:
   yup makes sense I'll move the interface over



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1117568502


##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;

Review Comment:
   actually I remember why I didn't initially put it in the storage package. it seems like it wouldn't be a simple change to move this to there because the TSM relies on the PartitionFetchState case class in AbstractFetcherThread.scala. it would've created a circular dependency across modules that I wanted to avoid. @junrao wdyt about keeping it in the core module in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   the returned value indicates whether the handler method returned with or without error. It's mentioned in the documentation, but I'll update it to be more noticeable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] satishd commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "satishd (via GitHub)" <gi...@apache.org>.
satishd commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1098319549


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   nit: convention is to name it as `log` based on the majority of the classes.



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;

Review Comment:
   Should we introduce this when we have async implementation with `maybeAdvanceState`?



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderStartOffset._1;
+        long leaderStartOffset = (long) epochAndLeaderStartOffset._2;

Review Comment:
   nit: may want to rename it as `leaderLocalStartOffset`



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());

Review Comment:
   nit: It is better to be specific here like `epochAndLeaderLocalStartOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1101875350


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   Thanks, Satish. Agreed. Since this one will be implemented as async eventually, there is probably no need to set LogContext. We can keep this as static. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] satishd commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "satishd (via GitHub)" <gi...@apache.org>.
satishd commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1101000469


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   This is always a static field unless it is loaded with LogContext.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116386328


##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -615,14 +649,29 @@ class AbstractFetcherThreadTest {
   @Test
   def testFollowerFetchMovedToTieredStore(): Unit = {
     val partition = new TopicPartition("topic", 0)
-    val fetcher = new MockFetcherThread(new MockLeaderEndPoint)
 
     val replicaLog = Seq(
       mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
       mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
       mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
 
     val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 0L, rlmEnabled = true)
+
+    val mockLeaderEndpoint = new MockLeaderEndPoint
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) {
+      // override the start() of MockTierStateMachine to mimic truncateFullyAndStartAt and update the replicaState in the MockFetcherThread
+      override def start(topicPartition: TopicPartition,
+                         currentFetchState: PartitionFetchState,
+                         fetchPartitionData: FetchResponseData.PartitionData): PartitionFetchState = {
+        replicaState.log.clear()
+        replicaState.localLogStartOffset = 5
+        replicaState.logEndOffset = 5

Review Comment:
   Instead of the customizing for this test, I am wondering if we could make this more general. For example, could we pass in `fetcher` to MockTierStateMachine and call `fetcher.truncateFullyAndStartAt() `in `MockTierStateMachine.start()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116408589


##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;

Review Comment:
   Should this be in the storage module under org.apache.kafka.server.log.remote.storage package? Eventually, we could move ReplicaFetcherTierStateMachine to the storage module too.



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -615,14 +649,29 @@ class AbstractFetcherThreadTest {
   @Test
   def testFollowerFetchMovedToTieredStore(): Unit = {
     val partition = new TopicPartition("topic", 0)
-    val fetcher = new MockFetcherThread(new MockLeaderEndPoint)
 
     val replicaLog = Seq(
       mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
       mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
       mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
 
     val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 0L, rlmEnabled = true)
+
+    val mockLeaderEndpoint = new MockLeaderEndPoint
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) {
+      // override the start() of MockTierStateMachine to mimic truncateFullyAndStartAt and update the replicaState in the MockFetcherThread
+      override def start(topicPartition: TopicPartition,
+                         currentFetchState: PartitionFetchState,
+                         fetchPartitionData: FetchResponseData.PartitionData): PartitionFetchState = {
+        replicaState.log.clear()
+        replicaState.localLogStartOffset = 5
+        replicaState.logEndOffset = 5

Review Comment:
   Got it. So, if we do that, it's the same as the callback logic you had earlier.
   
   Here is another way to improve this. Since AbstractFetcherThread already exposes the `partitionState` through `fetchState()`, we could just get rid of `replicaPartitionStates` in MockFetcherThread and purely rely on the `partitionState` that's changed on every `doWork()` call. Will that work? Since that's a bigger change, we could probably just take the callback code you had in this PR and make the bigger change in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on pull request #13206: Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao commented on PR #13206:
URL: https://github.com/apache/kafka/pull/13206#issuecomment-1419883513

   cc @satishd 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1099642781


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   ~~I think the Java checkstyle gave me an error previously. I can change it to LOG perhaps?~~
   
   nvm I was able to change it to `log`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1102353791


##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import java.util.Optional;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+
+/**
+ * This interface defines the APIs needed to handle any state transitions
+ * related to tiering in AbstractFetcherThread.

Review Comment:
   nit: Could we remove the `AbstractFetcherThread` class name here? Just `This interface defines the APIs needed to handle any state transitions related to tiering` to make it general. Otherwise, this will confuse readers if something changed in `AbstractFetcherThread`.



##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import java.util.Optional;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+
+/**
+ * This interface defines the APIs needed to handle any state transitions
+ * related to tiering in AbstractFetcherThread.
+ */
+public interface TierStateMachine {
+
+    /**
+     * Start the tier state machine for the provided topic partition.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error
+     *
+     * @return the new PartitionFetchState after the successful start of the
+     *         tier state machine
+     */
+    PartitionFetchState start(TopicPartition topicPartition,
+                              PartitionFetchState currentFetchState,
+                              PartitionData fetchPartitionData) throws Exception;
+
+    /**
+     * Optionally advance the state of the tier state machine, based on the
+     * current PartitionFetchState. The decision to advance the tier
+     * state machine is implementation specific.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     *
+     * @return the new PartitionFetchState if the tier state machine was advanced

Review Comment:
   nit: return the new PartitionFetchState if the tier state machine was advanced, **otherwise, return the currentFetchState**



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
        * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
        * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
        *
-       * In the first case, the follower's current log end offset is smaller than the leader's log start offset
-       * (or leader's local log start offset).
-       * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current
-       * leader's log start offset(or leader's local log start offset).
+       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
+       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
+       * start offset.

Review Comment:
   I read through the code and comments, it is not correct. We're saying `leaderEndOffset >= replicaEndOffset` case here, not `leaderStratOffset`. The leaderStartOffset is another case under leaderEndOffset >= replicaEndOffset`. So, maybe change to:
   
   In the first case, [if] the follower's current log end offset is smaller than the leader's log start offset, the follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log start offset since the data are all stale.
   
   WDYT?



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.OFFSET_OUT_OF_RANGE =>
                   if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
                     partitionsWithError += topicPartition
-                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
-                  debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
-                    s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition")
-                  if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState,
-                    fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset()))

Review Comment:
   For the `logStartOffset`, we used to retrieve from `partitionData`, which is from fetchResponse (from leader), and now, we changed to get from `fetchPartitionData`, which is from fetchRequest. Any reason we change it? I'm thinking the logStartOffset should still rely on the leader response to avoid some inconsistency. WDYT?



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
        * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
        * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
        *
-       * In the first case, the follower's current log end offset is smaller than the leader's log start offset
-       * (or leader's local log start offset).
-       * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current
-       * leader's log start offset(or leader's local log start offset).
+       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
+       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
+       * start offset.
        * In the second case, the follower should just keep the current log segments and retry the fetch. In the second
        * case, there will be some inconsistency of data between old and new leader. We are not solving it here.
        * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both
        * brokers and producers.
        *
        * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
-       * and the current leader's (local-log-start-offset or) log start offset.
+       * and the current leader's log start offset.
        */
-      val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
-        leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else
-        leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
-
+      val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)

Review Comment:
   Could you explain why we change this behavior? I think we should try fetch from local start offset if possible to save time to catch up. But after this change, we always fetch from log start offset (not local log start offset). Why should we change it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1104734391


##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import java.util.Optional;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+
+/**
+ * This interface defines the APIs needed to handle any state transitions
+ * related to tiering in AbstractFetcherThread.
+ */
+public interface TierStateMachine {

Review Comment:
   Why is this not in the `storage` module?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1113664441


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.OFFSET_OUT_OF_RANGE =>
                   if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
                     partitionsWithError += topicPartition
-                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
-                  debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
-                    s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition")
-                  if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState,
-                    fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset()))

Review Comment:
   ah i believe this was a mistake. I wanted the `TierStateMachine.start` to use the fetch response as well. thanks for the catch



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
        * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
        * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
        *
-       * In the first case, the follower's current log end offset is smaller than the leader's log start offset
-       * (or leader's local log start offset).
-       * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current
-       * leader's log start offset(or leader's local log start offset).
+       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
+       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
+       * start offset.

Review Comment:
   Yes this was just a revert to the original comments and logic. but this makes sense, I can try to clear up the cases. thanks for the comment suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] satishd commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "satishd (via GitHub)" <gi...@apache.org>.
satishd commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100998652


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   No, this is always a static field unless it is loaded with `LogContext`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1114995703


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -27,9 +27,11 @@ import kafka.utils.{DelayedItem, Logging, Pool}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.message.FetchResponseData.PartitionData
 import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
+//import org.apache.kafka.common.requests.FetchRequest.PartitionData

Review Comment:
   Should we remove this import?



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -633,13 +669,18 @@ class AbstractFetcherThreadTest {
       mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
       mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
 
-
     val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 8L, rlmEnabled = true)
     // Overriding the log start offset to zero for mocking the scenario of segment 0-4 moved to remote store.
     leaderState.logStartOffset = 0
     fetcher.mockLeader.setLeaderState(partition, leaderState)
     fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
 
+    def buildRemoteLog(topicPartition: TopicPartition, leaderLogStartOffset: Long): Unit = {
+      fetcher.truncateFullyAndStartAt(topicPartition, leaderState.localLogStartOffset)
+      replicaState.logStartOffset = leaderLogStartOffset

Review Comment:
   Well, `buildRemoteLog()` does two things (1) call `fetcher.truncateFullyAndStartAt` and (2) set `replicaState.logStartOffset`. For (1), since the truncation logic is moved to TierStateMachine, it probably should be done in `MockTierStateMachine.start()` directly. For (2), the existing test doesn't need to set `replicaState.logStartOffset.` So, it seems it's unnecessary? If address both (1) and (2), then the callback is not needed.



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -424,6 +406,13 @@ abstract class AbstractFetcherThread(name: String,
                   if (onPartitionFenced(topicPartition, fetchPartitionData.currentLeaderEpoch))
                     partitionsWithError += topicPartition
 
+                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
+                  debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
+                    s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition")
+                  if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch, partitionData)) {
+                    partitionsWithError += topicPartition
+                  }

Review Comment:
   No need for brackets for single line statements.



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -794,17 +742,18 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   This is the fetch response data. Also, could we add the missing param?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "rittikaadhikari (via GitHub)" <gi...@apache.org>.
rittikaadhikari commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1097985124


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.

Review Comment:
   nit: Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.OFFSET_OUT_OF_RANGE =>
                   if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
                     partitionsWithError += topicPartition
-                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
-                  debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
-                    s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition")
-                  if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState,
-                    fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset()))
-                    partitionsWithError += topicPartition
+

Review Comment:
   nit: extra line



##########
core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+
+import java.util.Optional;
+
+public class ReplicaAlterLogDirsTierStateMachine implements TierStateMachine {

Review Comment:
   nit: add Java Docs



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -785,17 +732,19 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitonData the fetch request data for this topic partition
    */
   private def handleOffsetsMovedToTieredStorage(topicPartition: TopicPartition,
                                                 fetchState: PartitionFetchState,
-                                                leaderEpochInRequest: Optional[Integer],
-                                                leaderLogStartOffset: Long): Boolean = {
+                                                fetchPartitonData: PartitionData): Boolean = {
+    val leaderEpochInRequest = fetchPartitonData.currentLeaderEpoch
     try {
-      val newFetchState = fetchOffsetAndApplyTruncateAndBuild(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch,
-        (offsetEpoch, leaderLocalLogStartOffset) => buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset))
+      val newFetchState = fetchTierStateMachine.start(topicPartition, fetchState, fetchPartitonData);
+
+      // No-op for now
+      // newFetchState = fetchTierStateMachine.maybeAdvanceState(topicPartition, newFetchState).get()

Review Comment:
   maybe mark this as a TODO instead? 
   i.e., 
   ```
   // TODO: use fetchTierStateMachine.maybeAdvanceState when implementing async tiering logic in KAFKA-13560
   ```



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -785,17 +732,19 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitonData the fetch request data for this topic partition

Review Comment:
   nit: fetchPartitionData



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,

Review Comment:
   nit: Add java doc for this function



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -821,8 +868,9 @@ class AbstractFetcherThreadTest {
   def testCorruptMessage(): Unit = {
     val partition = new TopicPartition("topic", 0)
 
-    val fetcher = new MockFetcherThread(new MockLeaderEndPoint {
+    val mockLeaderEndPoint = new MockLeaderEndPoint {
       var fetchedOnce = false
+

Review Comment:
   nit: extra new line



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderStartOffset._1;
+        long leaderStartOffset = (long) epochAndLeaderStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<EpochEntry>("", bufferedReader, 0, LeaderEpochCheckpointFile.Formatter$.MODULE$);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */
+    protected Long buildRemoteLogAuxState(TopicPartition topicPartition,
+                                        Integer currentLeaderEpoch,
+                                        Long leaderLocalLogStartOffset,
+                                        Integer epochForLeaderLocalLogStartOffset,
+                                        Long leaderLogStartOffset) throws IOException, RemoteStorageException {
+
+        UnifiedLog log = replicaMgr.localLogOrException(topicPartition);
+
+        long nextOffset;
+
+        if (log.remoteStorageSystemEnable() && log.config().remoteLogConfig.remoteStorageEnable) {
+            if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated");
+
+            RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+            // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+            // until that offset
+            long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
+            int targetEpoch;
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if (epochForLeaderLocalLogStartOffset == 0) {
+                targetEpoch = epochForLeaderLocalLogStartOffset;
+            } else {
+                // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+                EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
+                // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+                if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) {
+                    // Always use the leader epoch from returned earlierEpochEndOffset.
+                    // This gives the respective leader epoch, that will handle any gaps in epochs.
+                    // For ex, leader epoch cache contains:
+                    // leader-epoch   start-offset
+                    //  0               20
+                    //  1               85
+                    //  <2> - gap no messages were appended in this leader epoch.
+                    //  3               90
+                    //  4               98
+                    // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+                    // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+                    // So, for offset 89, we should return leader epoch as 1 like below.
+                    targetEpoch = earlierEpochEndOffset.leaderEpoch();
+                } else
+                    targetEpoch = epochForLeaderLocalLogStartOffset;
+            }
+
+            Optional<RemoteLogSegmentMetadata> maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset);
+
+            if (maybeRlsm.isPresent()) {
+                RemoteLogSegmentMetadata remoteLogSegmentMetadata = maybeRlsm.get();
+                // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start
+                // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+                // Assign nextOffset with the offset from which next fetch should happen.
+                nextOffset = remoteLogSegmentMetadata.endOffset() + 1;
+
+                // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+                Partition partition = replicaMgr.getPartitionOrException(topicPartition);
+                partition.truncateFullyAndStartAt(nextOffset, false);
+
+                // Build leader epoch cache.
+                log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented$.MODULE$);
+                Seq<EpochEntry> epochs = JavaConverters.asScala(readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata)).toSeq();
+                if (log.leaderEpochCache().isDefined()) {
+                    log.leaderEpochCache().get().assign(epochs);
+                }
+
+                LOGGER.debug("Updated the epoch cache from remote tier till offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), partition);
+
+                // Restore producer snapshot
+                File snapshotFile = UnifiedLog.producerSnapshotFile(log.dir(), nextOffset);
+                buildProducerSnapshotFile(snapshotFile, remoteLogSegmentMetadata, rlm);
+
+                // Reload producer snapshots.
+                log.producerStateManager().truncateFullyAndReloadSnapshots();
+                log.loadProducerState(nextOffset);
+                LOGGER.debug("Built the leader epoch cache and producer snapshots from remote tier for {}, " +
+                                "with active producers size: {}, leaderLogStartOffset: {}, and logEndOffset: {}",
+                        partition, log.producerStateManager().activeProducers().size(), leaderLogStartOffset, nextOffset);
+            } else {
+                throw new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition +
+                        ", currentLeaderEpoch: " + currentLeaderEpoch +
+                        ", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset +
+                        ", leaderLogStartOffset: " + leaderLogStartOffset +
+                        ", epoch: " + targetEpoch +
+                        "as the previous remote log segment metadata was not found");
+            }
+        } else {
+            // If the tiered storage is not enabled throw an exception back so tht it will retry until the tiered storage

Review Comment:
   nit: that



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderStartOffset._1;
+        long leaderStartOffset = (long) epochAndLeaderStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<EpochEntry>("", bufferedReader, 0, LeaderEpochCheckpointFile.Formatter$.MODULE$);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */
+    protected Long buildRemoteLogAuxState(TopicPartition topicPartition,
+                                        Integer currentLeaderEpoch,
+                                        Long leaderLocalLogStartOffset,
+                                        Integer epochForLeaderLocalLogStartOffset,
+                                        Long leaderLogStartOffset) throws IOException, RemoteStorageException {
+
+        UnifiedLog log = replicaMgr.localLogOrException(topicPartition);
+
+        long nextOffset;
+
+        if (log.remoteStorageSystemEnable() && log.config().remoteLogConfig.remoteStorageEnable) {
+            if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated");
+
+            RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+            // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+            // until that offset
+            long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
+            int targetEpoch;
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if (epochForLeaderLocalLogStartOffset == 0) {
+                targetEpoch = epochForLeaderLocalLogStartOffset;
+            } else {
+                // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+                EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
+                // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+                if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) {
+                    // Always use the leader epoch from returned earlierEpochEndOffset.
+                    // This gives the respective leader epoch, that will handle any gaps in epochs.
+                    // For ex, leader epoch cache contains:
+                    // leader-epoch   start-offset
+                    //  0               20
+                    //  1               85
+                    //  <2> - gap no messages were appended in this leader epoch.
+                    //  3               90
+                    //  4               98
+                    // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+                    // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+                    // So, for offset 89, we should return leader epoch as 1 like below.
+                    targetEpoch = earlierEpochEndOffset.leaderEpoch();
+                } else

Review Comment:
   nit: add {} to else statement



##########
core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+
+import java.util.Optional;
+
+public class ReplicaAlterLogDirsTierStateMachine implements TierStateMachine {
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     FetchRequest.PartitionData fetchPartitionData) throws Exception {
+        // JBOD is not supported with tiered storage.
+        throw new UnsupportedOperationException("Building remote log aux state not supported in ReplicaAlterLogDirsThread.");

Review Comment:
   nit: Building remote log aux state is not supported in ReplicaAlterLogDirsThread.



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderStartOffset._1;
+        long leaderStartOffset = (long) epochAndLeaderStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,

Review Comment:
   Add Java doc for this function



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -785,17 +732,19 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitonData the fetch request data for this topic partition
    */
   private def handleOffsetsMovedToTieredStorage(topicPartition: TopicPartition,
                                                 fetchState: PartitionFetchState,
-                                                leaderEpochInRequest: Optional[Integer],
-                                                leaderLogStartOffset: Long): Boolean = {
+                                                fetchPartitonData: PartitionData): Boolean = {

Review Comment:
   nit: fetchPartitionData



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr,
+                                          Integer fetchBackOffMs) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+        this.fetchBackOffMs = fetchBackOffMs;
+    }
+
+
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderStartOffset._1;
+        long leaderStartOffset = (long) epochAndLeaderStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<EpochEntry>("", bufferedReader, 0, LeaderEpochCheckpointFile.Formatter$.MODULE$);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */
+    protected Long buildRemoteLogAuxState(TopicPartition topicPartition,

Review Comment:
   Does this API still need to be protected?



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1334,6 +1399,33 @@ class AbstractFetcherThreadTest {
     }
   }
 
+  class MockTierStateMachine(leader: LeaderEndPoint,

Review Comment:
   can set a default value for `fetchBackOffMs` to 0



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -779,14 +823,17 @@ class AbstractFetcherThreadTest {
   @Test
   def testRetryAfterUnknownLeaderEpochInLatestOffsetFetch(): Unit = {
     val partition = new TopicPartition("topic", 0)
-    val fetcher: MockFetcherThread = new MockFetcherThread(new MockLeaderEndPoint {
+    val mockLeaderEndPoint = new MockLeaderEndPoint {
       val tries = new AtomicInteger(0)
+

Review Comment:
   nit: extra new line



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -664,16 +705,14 @@ class AbstractFetcherThreadTest {
   def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = {
     val partition = new TopicPartition("topic", 0)
     var isErrorHandled = false
-    val fetcher = new MockFetcherThread(new MockLeaderEndPoint) {
-      override protected def buildRemoteLogAuxState(partition: TopicPartition,
-                                                    currentLeaderEpoch: Int,
-                                                    fetchOffset: Long,
-                                                    epochForFetchOffset: Int,
-                                                    leaderLogStartOffset: Long): Long = {
+    val mockLeaderEndpoint = new MockLeaderEndPoint
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint, 0) {
+      override def start(topicPartition: TopicPartition, currentFetchState: PartitionFetchState, fetchPartitionData: FetchRequest.PartitionData): PartitionFetchState = {
         isErrorHandled = true
-        throw new FencedLeaderEpochException(s"Epoch $currentLeaderEpoch is fenced")
+        throw new FencedLeaderEpochException(s"Epoch ${currentFetchState.currentLeaderEpoch} if fenced")

Review Comment:
   nit: is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1099642781


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   I think the Java checkstyle gave me an error previously. I can change it to LOG perhaps?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1099947908


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   What is the returned boolean indicating?



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+    }
+
+
+    /**
+     * Start the tier state machine for the provided topic partition. Currently, this start method will build the
+     * entire remote aux log state synchronously.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error
+     *
+     * @return the new PartitionFetchState after the successful start of the
+     *         tier state machine
+     */
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderLocalStartOffset._1;
+        long leaderLocalStartOffset = (long) epochAndLeaderLocalStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    /**
+     * This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     *
+     * @return the original PartitionFetchState
+     */
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */
+    private Long buildRemoteLogAuxState(TopicPartition topicPartition,

Review Comment:
   Have you considered refactoring this method to ease testability? This could be part of another PR too to avoid changing too many parts in one code refactor.



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+    }
+
+
+    /**
+     * Start the tier state machine for the provided topic partition. Currently, this start method will build the
+     * entire remote aux log state synchronously.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error
+     *
+     * @return the new PartitionFetchState after the successful start of the
+     *         tier state machine
+     */
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderLocalStartOffset._1;
+        long leaderLocalStartOffset = (long) epochAndLeaderLocalStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    /**
+     * This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     *
+     * @return the original PartitionFetchState
+     */
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */
+    private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+                                        Integer currentLeaderEpoch,
+                                        Long leaderLocalLogStartOffset,
+                                        Integer epochForLeaderLocalLogStartOffset,
+                                        Long leaderLogStartOffset) throws IOException, RemoteStorageException {
+
+        UnifiedLog log = replicaMgr.localLogOrException(topicPartition);
+
+        long nextOffset;
+
+        if (log.remoteStorageSystemEnable() && log.config().remoteLogConfig.remoteStorageEnable) {
+            if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated");
+
+            RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+            // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+            // until that offset
+            long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
+            int targetEpoch;
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if (epochForLeaderLocalLogStartOffset == 0) {
+                targetEpoch = epochForLeaderLocalLogStartOffset;
+            } else {
+                // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+                EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
+                // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+                if (earlierEpochEndOffset.endOffset() > previousOffsetToLeaderLocalLogStartOffset) {
+                    // Always use the leader epoch from returned earlierEpochEndOffset.
+                    // This gives the respective leader epoch, that will handle any gaps in epochs.
+                    // For ex, leader epoch cache contains:
+                    // leader-epoch   start-offset
+                    //  0               20
+                    //  1               85
+                    //  <2> - gap no messages were appended in this leader epoch.
+                    //  3               90
+                    //  4               98
+                    // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+                    // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+                    // So, for offset 89, we should return leader epoch as 1 like below.
+                    targetEpoch = earlierEpochEndOffset.leaderEpoch();
+                } else {
+                    targetEpoch = epochForLeaderLocalLogStartOffset;
+                }
+            }
+
+            Optional<RemoteLogSegmentMetadata> maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset);
+
+            if (maybeRlsm.isPresent()) {

Review Comment:
   Note: if the rlmMetadata is unavailable for an extended period of time, the replica fetcher will keep retrying indefinitely to construct the starting fetch state for the partition. This will lead to an `OffsetForLeaderEpoch` and `ListOffsets` requests every time. If a large number of partitions are impacted, that will generate unnecessary inter-broker traffic on the cluster - although marginal most of the time. As an optimization, we could store the leader epoch associated to the leader's local log start offset - 1 which was retrieved here (we would still, however, need to query for the local log start offset on the leader on every iteration).
   
   The asynchronous resolution of the correct fetch state from the remote storages (KAFKA-13560) will prevent the extra load on the replica fetcher thread itself. The consideration above applies to the RPCs which are made on the synchronous fetch path.



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+    }
+
+
+    /**
+     * Start the tier state machine for the provided topic partition. Currently, this start method will build the
+     * entire remote aux log state synchronously.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error
+     *
+     * @return the new PartitionFetchState after the successful start of the
+     *         tier state machine
+     */
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderLocalStartOffset._1;
+        long leaderLocalStartOffset = (long) epochAndLeaderLocalStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    /**
+     * This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     *
+     * @return the original PartitionFetchState
+     */
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();

Review Comment:
   nit - define the type of `partitionsWithEpoch` as `Map`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100515746


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   This seems a bit weird. The log typically is an instance level object.



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+    }
+
+
+    /**
+     * Start the tier state machine for the provided topic partition. Currently, this start method will build the
+     * entire remote aux log state synchronously.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error
+     *
+     * @return the new PartitionFetchState after the successful start of the
+     *         tier state machine
+     */
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderLocalStartOffset._1;
+        long leaderLocalStartOffset = (long) epochAndLeaderLocalStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    /**
+     * This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     *
+     * @return the original PartitionFetchState
+     */
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */
+    private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+                                        Integer currentLeaderEpoch,
+                                        Long leaderLocalLogStartOffset,
+                                        Integer epochForLeaderLocalLogStartOffset,
+                                        Long leaderLogStartOffset) throws IOException, RemoteStorageException {
+
+        UnifiedLog log = replicaMgr.localLogOrException(topicPartition);
+
+        long nextOffset;
+
+        if (log.remoteStorageSystemEnable() && log.config().remoteLogConfig.remoteStorageEnable) {
+            if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated");
+
+            RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+            // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+            // until that offset
+            long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1;
+            int targetEpoch;
+            // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if (epochForLeaderLocalLogStartOffset == 0) {
+                targetEpoch = epochForLeaderLocalLogStartOffset;
+            } else {
+                // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+                EpochEndOffset earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, currentLeaderEpoch);
+                // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.

Review Comment:
   This is an existing issue. But change "with in" to "within"?



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1334,6 +1397,31 @@ class AbstractFetcherThreadTest {
     }
   }
 
+  class MockTierStateMachine(leader: LeaderEndPoint) extends ReplicaFetcherTierStateMachine(leader, null) {
+
+    var startCallback: (TopicPartition, Long) => Unit = (_,_) => {}

Review Comment:
   Could this be private?



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;
+
+    public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+                                          ReplicaManager replicaMgr) {
+        this.leader = leader;
+        this.replicaMgr = replicaMgr;
+    }
+
+
+    /**
+     * Start the tier state machine for the provided topic partition. Currently, this start method will build the
+     * entire remote aux log state synchronously.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error
+     *
+     * @return the new PartitionFetchState after the successful start of the
+     *         tier state machine
+     */
+    public PartitionFetchState start(TopicPartition topicPartition,
+                                     PartitionFetchState currentFetchState,
+                                     PartitionData fetchPartitionData) throws Exception {
+
+        Tuple2<Object, Object> epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        int epoch = (int) epochAndLeaderLocalStartOffset._1;
+        long leaderLocalStartOffset = (long) epochAndLeaderLocalStartOffset._2;
+
+        long offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset);
+
+        Tuple2<Object, Object> fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = (long) fetchLatestOffsetResult._2;
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch());
+    }
+
+    /**
+     * This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     *
+     * @return the original PartitionFetchState
+     */
+    public Optional<PartitionFetchState> maybeAdvanceState(TopicPartition topicPartition,
+                                                           PartitionFetchState currentFetchState) {
+        // No-op for now
+        return Optional.of(currentFetchState);
+    }
+
+    private EpochEndOffset fetchEarlierEpochEndOffset(Integer epoch,
+                                                      TopicPartition partition,
+                                                      Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from the leader
+        HashMap<TopicPartition, OffsetForLeaderPartition> partitionsWithEpochs = new HashMap<>();
+        partitionsWithEpochs.put(partition, new OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+
+        Option<EpochEndOffset> maybeEpochEndOffset = leader.fetchEpochEndOffsets(JavaConverters.asScala(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + partition);
+        }
+
+        EpochEndOffset epochEndOffset = maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, RemoteStorageException {
+        InputStream inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, LeaderEpochCheckpointFile.FORMATTER);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(File snapshotFile,
+                                           RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws IOException, RemoteStorageException {
+        File tmpSnapshotFile = new File(snapshotFile.getAbsolutePath() + ".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath(), snapshotFile.toPath(), false);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and remote storage so that it can start
+     * fetching records from the leader.
+     */

Review Comment:
   This is an existing issue, but could we add a comment describing the return value?



##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+    private LeaderEndPoint leader;
+    private ReplicaManager replicaMgr;
+    private Integer fetchBackOffMs;

Review Comment:
   Has this been removed?



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -633,13 +669,18 @@ class AbstractFetcherThreadTest {
       mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
       mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
 
-
     val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 8L, rlmEnabled = true)
     // Overriding the log start offset to zero for mocking the scenario of segment 0-4 moved to remote store.
     leaderState.logStartOffset = 0
     fetcher.mockLeader.setLeaderState(partition, leaderState)
     fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
 
+    def buildRemoteLog(topicPartition: TopicPartition, leaderLogStartOffset: Long): Unit = {
+      fetcher.truncateFullyAndStartAt(topicPartition, leaderState.localLogStartOffset)
+      replicaState.logStartOffset = leaderLogStartOffset

Review Comment:
   Here, we want to update the replicaState in the MockFetcherThread. Would it be better to call fetcher.replicaPartitionState to get the replicaState explicitly?
   
   Also, adding a callback in a Mock class seems a bit complicated. Alternatively, we could override `doWork`() in MockFetcherThread so that it updates its replicaPartitionState from the partitionState in AbstractFetcherThread after each call. This way, we could get rid of the callback in mockTierStateMachine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100802687


##########
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+    private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   i'll remove the static



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on PR #13206:
URL: https://github.com/apache/kafka/pull/13206#issuecomment-1430302196

   sorry for the delay on any open comments, I'll be able to reply to and iterate on them by tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

Posted by "mattwong949 (via GitHub)" <gi...@apache.org>.
mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116325758


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -794,17 +742,18 @@ abstract class AbstractFetcherThread(name: String,
    *
    * @param topicPartition topic partition
    * @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   ah thanks for the catches. I have fixed those



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org