You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/09 20:08:09 UTC

[GitHub] [kafka] junrao commented on a diff in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

junrao commented on code in PR #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r1043982291


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -289,13 +296,26 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
 
+  private[kafka] var _localLogStartOffset: Long = logStartOffset

Review Comment:
   Does this need to be volatile?



##########
clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
+import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
+
+public class RemoteLogInputStream implements LogInputStream<RecordBatch> {
+    private final InputStream inputStream;
+    // LogHeader buffer up to magic.
+    private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);
+
+    public RemoteLogInputStream(InputStream inputStream) {
+        this.inputStream = inputStream;
+    }
+
+    @Override
+    public RecordBatch nextBatch() throws IOException {
+        logHeaderBuffer.clear();
+        Utils.readFully(inputStream, logHeaderBuffer);
+
+        if (logHeaderBuffer.position() < HEADER_SIZE_UP_TO_MAGIC)
+            return null;
+
+        logHeaderBuffer.rewind();
+        int size = logHeaderBuffer.getInt(SIZE_OFFSET);
+
+        // V0 has the smallest overhead, stricter checking is done later
+        if (size < LegacyRecord.RECORD_OVERHEAD_V0)
+            throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " +
+                                                                   "overhead (%d).", size, LegacyRecord.RECORD_OVERHEAD_V0));
+
+        // 'size' includes, 4 bytes + magic + size(batch-records). So, the complete batch buffer including the header

Review Comment:
   The comment is a bit confusing since 4 bytes + magic doesn't add up to LOG_OVERHEAD.



##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.Logging
+import org.apache.kafka.common._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{Closeable, InputStream}
+import java.security.{AccessController, PrivilegedAction}
+import java.util
+import java.util.Optional
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import scala.collection.Set
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class is responsible for
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances.
+ *  - receives any leader and follower replica events and partition stop events and act on them
+ *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *
+ * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
+ * @param brokerId  id of the current broker.
+ * @param logDir    directory of Kafka log segments.
+ */
+class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+                       brokerId: Int,
+                       logDir: String) extends Logging with Closeable with KafkaMetricsGroup {
+
+  // topic ids received on leadership changes
+  private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]()
+
+  private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager()
+  private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager()
+
+  private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir)
+
+  private var closed = false
+
+  private[remote] def createRemoteStorageManager(): RemoteStorageManager = {
+    def createDelegate(classLoader: ClassLoader): RemoteStorageManager = {
+      classLoader.loadClass(rlmConfig.remoteStorageManagerClassName())
+        .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] {
+      private val classPath = rlmConfig.remoteStorageManagerClassPath()
+
+      override def run(): RemoteStorageManager = {
+          if (classPath != null && classPath.trim.nonEmpty) {
+            val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+            val delegate = createDelegate(classLoader)
+            new ClassLoaderAwareRemoteStorageManager(delegate, classLoader)
+          } else {
+            createDelegate(this.getClass.getClassLoader)
+          }
+      }
+    })
+  }
+
+  private def configureRSM(): Unit = {
+    val rsmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) }
+    rsmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    remoteLogStorageManager.configure(rsmProps)
+  }
+
+  private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = {
+    def createDelegate(classLoader: ClassLoader) = {
+      classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName())
+        .getDeclaredConstructor()
+        .newInstance()
+        .asInstanceOf[RemoteLogMetadataManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] {
+      private val classPath = rlmConfig.remoteLogMetadataManagerClassPath
+
+      override def run(): RemoteLogMetadataManager = {
+        if (classPath != null && classPath.trim.nonEmpty) {
+          val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+          val delegate = createDelegate(classLoader)
+          new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader)
+        } else {
+          createDelegate(this.getClass.getClassLoader)
+        }
+      }
+    })
+  }
+
+  private def configureRLMM(): Unit = {
+    val rlmmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteLogMetadataManagerProps().asScala.foreach { case (k, v) => rlmmProps.put(k, v) }
+    rlmmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    rlmmProps.put(KafkaConfig.LogDirProp, logDir)
+    remoteLogMetadataManager.configure(rlmmProps)
+  }
+
+  def startup(): Unit = {
+    // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources
+    // in connecting to the brokers or remote storages.
+    configureRSM()
+    configureRLMM()
+  }
+
+  def storageManager(): RemoteStorageManager = {
+    remoteLogStorageManager
+  }
+
+  /**
+   * Callback to receive any leadership changes for the topic partitions assigned to this broker. If there are no
+   * existing tasks for a given topic partition then it will assign new leader or follower task else it will convert the
+   * task to respective target state(leader or follower).
+   *
+   * @param partitionsBecomeLeader   partitions that have become leaders on this broker.
+   * @param partitionsBecomeFollower partitions that have become followers on this broker.
+   * @param topicIds                 topic name to topic id mappings.
+   */
+  def onLeadershipChange(partitionsBecomeLeader: Set[Partition],
+                         partitionsBecomeFollower: Set[Partition],
+                         topicIds: util.Map[String, Uuid]): Unit = {
+    debug(s"Received leadership changes for leaders: $partitionsBecomeLeader and followers: $partitionsBecomeFollower")
+
+    // Partitions logs are available when this callback is invoked.
+    // Compact topics and internal topics are filtered here as they are not supported with tiered storage.
+    def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = {
+      // We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that.
+      partitions.filter(partition => partition.log.exists(log => log.remoteLogEnabled()))
+        .map(partition => new TopicIdPartition(topicIds.get(partition.topic), partition.topicPartition))
+    }
+
+    val followerTopicPartitions = filterPartitions(partitionsBecomeFollower)
+    val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader)
+    debug(s"Effective topic partitions after filtering compact and internal topics, leaders: $leaderTopicPartitions " +
+      s"and followers: $followerTopicPartitions")
+
+    if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) {
+      leaderTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId()))
+      followerTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId()))
+
+      remoteLogMetadataManager.onPartitionLeadershipChanges(leaderTopicPartitions.asJava, followerTopicPartitions.asJava)
+    }
+  }
+
+  /**
+   * Deletes the internal topic partition info if delete flag is set as true.
+   *
+   * @param topicPartition topic partition to be stopped.
+   * @param delete         flag to indicate whether the given topic partitions to be deleted or not.
+   */
+  def stopPartitions(topicPartition: TopicPartition, delete: Boolean): Unit = {
+    if (delete) {
+      // Delete from internal datastructures only if it is to be deleted.
+      val topicIdPartition = topicPartitionIds.remove(topicPartition)
+      debug(s"Removed partition: $topicIdPartition from topicPartitionIds")

Review Comment:
   I guess we will add the logic to delete the remote data later?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -574,6 +594,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     explicitMetricName(pkgStr, "Log", name, tags)
   }
 
+  def loadProducerState(lastOffset: Long): Unit = lock synchronized {
+    rebuildProducerState(lastOffset, producerStateManager)
+    maybeIncrementFirstUnstableOffset()

Review Comment:
   Should we reset HWM too?



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -381,9 +398,14 @@ abstract class AbstractFetcherThread(name: String,
                       markPartitionFailed(topicPartition)
                   }
                 case Errors.OFFSET_OUT_OF_RANGE =>
-                  if (handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
+                  if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
+                    partitionsWithError += topicPartition
+                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
+                  debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
+                    s"fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition")

Review Comment:
   It would be useful to make logging a complete sentence, So sth like debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE} at " +
                       s"fetch offset: ${currentFetchState.fetchOffset} for " + s"topic-partition: $topicPartition")



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -1861,6 +1861,39 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
     MetadataVersion.MINIMUM_KRAFT_VERSION
   }
 
+  val fetchRequestVersion: Short =

Review Comment:
   This seems unused?



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -702,6 +718,102 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
+  /**
+   * Handle a partition whose offset is out of range and return a new fetch offset.
+   */
+  private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
+    fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch,
+      (_, leaderLogStartOffset) => {
+        truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)
+        leaderLogStartOffset
+      },
+      // In this case, it will fetch from leader's log-start-offset like earlier instead of fetching from
+      // local-log-start-offset. This handles both the scenarios of whether tiered storage is enabled or not.
+      // If tiered storage is enabled, the next fetch result of fetching from log-start-offset may result in
+      // OffsetMovedToTieredStorage error and it will handle building the remote log state.
+      fetchFromLocalLogStartOffset = false)
+  }
+
+  /**
+   * Handles the out of range error for the given topic partition.
+   *
+   * Returns true if
+   *    - the request succeeded or
+   *    - it was fenced and this thread haven't received new epoch, which means we need not backoff and retry as the

Review Comment:
   haven't => hasn't



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -565,6 +587,11 @@ class BrokerServer(
       if (logManager != null)
         CoreUtils.swallow(logManager.shutdown(), this)
 
+      // Close remote log manager before stopping processing requests, to give a chance to any

Review Comment:
   We already stopped processing requests at this point, right?



##########
core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala:
##########
@@ -52,14 +52,22 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
     * Assigns the supplied Leader Epoch to the supplied Offset
     * Once the epoch is assigned it cannot be reassigned
     */
-  def assign(epoch: Int, startOffset: Long): Unit = {
+  def assign(epoch: Int, startOffset: Long, flushToFile: Boolean = true): Unit = {

Review Comment:
   It seems no non-test caller sets flushToFile to false?



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -1103,18 +1195,24 @@ class AbstractFetcherThreadTest {
       }.toMap
     }
 
-    override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
+    override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
       val leaderState = leaderPartitionState(topicPartition)
       checkLeaderEpochAndThrow(leaderEpoch, leaderState)
-      leaderState.logStartOffset
+      (leaderState.leaderEpoch, leaderState.logStartOffset)
     }
 
-    override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
+    override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
       val leaderState = leaderPartitionState(topicPartition)
       checkLeaderEpochAndThrow(leaderEpoch, leaderState)
-      leaderState.logEndOffset
+      (leaderState.leaderEpoch, leaderState.logEndOffset)
     }
 
+    override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+      val leaderState = leaderPartitionState(topicPartition)
+      checkLeaderEpochAndThrow(leaderEpoch, leaderState)
+      (leaderState.leaderEpoch, leaderState.localLogStartOffset)
+
+    }

Review Comment:
   add newline after



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -741,6 +740,30 @@ class ProducerStateManagerTest {
     assertEquals(Set(1), currentSnapshotOffsets)
   }
 
+  @Test
+  def testReloadSnapshots(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 1, 1L)
+    append(stateManager, producerId, epoch, 2, 2L)
+    stateManager.takeSnapshot()
+    val pathAndDataList = logDir.listFiles().map(file => (file.toPath, Files.readAllBytes(file.toPath)))
+
+    append(stateManager, producerId, epoch, 3, 3L)
+    append(stateManager, producerId, epoch, 4, 4L)
+    stateManager.takeSnapshot()
+    assertEquals(2, logDir.listFiles().length)
+    assertEquals(Set(3, 5), currentSnapshotOffsets)
+
+    stateManager.truncateAndReload(3, 5, time.milliseconds())
+    assertEquals(1, logDir.listFiles().length)
+    assertEquals(Set(5), currentSnapshotOffsets)
+
+    pathAndDataList.foreach(e => Files.write(e._1, e._2))

Review Comment:
   Could we use case statement to avoid using unnamed references?



##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -741,6 +740,30 @@ class ProducerStateManagerTest {
     assertEquals(Set(1), currentSnapshotOffsets)
   }
 
+  @Test
+  def testReloadSnapshots(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 1, 1L)
+    append(stateManager, producerId, epoch, 2, 2L)
+    stateManager.takeSnapshot()
+    val pathAndDataList = logDir.listFiles().map(file => (file.toPath, Files.readAllBytes(file.toPath)))
+
+    append(stateManager, producerId, epoch, 3, 3L)
+    append(stateManager, producerId, epoch, 4, 4L)
+    stateManager.takeSnapshot()
+    assertEquals(2, logDir.listFiles().length)
+    assertEquals(Set(3, 5), currentSnapshotOffsets)
+
+    stateManager.truncateAndReload(3, 5, time.milliseconds())
+    assertEquals(1, logDir.listFiles().length)
+    assertEquals(Set(5), currentSnapshotOffsets)
+
+    pathAndDataList.foreach(e => Files.write(e._1, e._2))
+    stateManager.reloadSnapshots()
+    assertEquals(Some(3), stateManager.latestSnapshotOffset)

Review Comment:
   Hmm, I am not sure that I understand the test. The snapshot corresponding to offset 5 should still be there, right? Where did we delete it?



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.log.{OffsetIndex, OffsetPosition, TimeIndex, UnifiedLog}
+import kafka.utils.MockTime
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
+import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
+import org.apache.kafka.test.TestUtils
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.mockito.ArgumentMatchers
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
+
+import java.io.{File, FileInputStream}
+import java.nio.file.Files
+import java.util.Collections
+import scala.collection.mutable
+
+class RemoteIndexCacheTest {
+
+  val time = new MockTime()
+  val partition = new TopicPartition("foo", 0)
+  val idPartition = new TopicIdPartition(Uuid.randomUuid(), partition)
+  val logDir: File = TestUtils.tempDirectory("kafka-logs")
+  val tpDir: File = new File(logDir, partition.toString)
+  val brokerId = 1
+  val baseOffset = 45L
+  val lastOffset = 75L
+  val segmentSize = 1024
+
+  val rsm: RemoteStorageManager = mock(classOf[RemoteStorageManager])
+  val cache: RemoteIndexCache =  new RemoteIndexCache(remoteStorageManager = rsm, logDir = logDir.toString)
+  val remoteLogSegmentId = new RemoteLogSegmentId(idPartition, Uuid.randomUuid())
+  val rlsMetadata: RemoteLogSegmentMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
+    time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
+
+  @BeforeEach
+  def setup(): Unit = {
+    Files.createDirectory(tpDir.toPath)
+    val txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TxnIndexFileSuffix)
+    txnIdxFile.createNewFile()
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
+        val offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix),
+          metadata.startOffset(), maxIndexSize = maxEntries * 8)
+        val timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix),
+          metadata.startOffset(), maxIndexSize = maxEntries * 12)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdxFile)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
+        }
+      })
+  }
+
+  @AfterEach
+  def cleanup(): Unit = {
+    reset(rsm)
+    cache.entries.forEach((_, v) => v.cleanup())
+    cache.close()
+  }
+
+  @Test
+  def testFetchIndexFromRemoteStorage(): Unit = {
+    val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+    val offsetPosition1 = offsetIndex.entry(1)
+    // this call should have invoked fetchOffsetIndex, fetchTimestampIndex once
+    val resultPosition = cache.lookupOffset(rlsMetadata, offsetPosition1.offset)
+    assertEquals(offsetPosition1.position, resultPosition)
+    verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+
+    // this should not cause fetching index from RemoteStorageManager as it is already fetched earlier
+    reset(rsm)
+    val offsetPosition2 = offsetIndex.entry(2)
+    val resultPosition2 = cache.lookupOffset(rlsMetadata, offsetPosition2.offset)
+    assertEquals(offsetPosition2.position, resultPosition2)
+    assertNotNull(cache.getIndexEntry(rlsMetadata))
+    verifyNoInteractions(rsm)
+  }
+
+  @Test
+  def testPositionForNonExistingIndexFromRemoteStorage(): Unit = {
+    val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+    val lastOffsetPosition = cache.lookupOffset(rlsMetadata, offsetIndex.lastOffset)
+    val greaterOffsetThanLastOffset = offsetIndex.lastOffset + 1
+    assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata, greaterOffsetThanLastOffset))
+
+    // offsetIndex.lookup() returns OffsetPosition(baseOffset, 0) for offsets smaller than least entry in the offset index.
+    val nonExistentOffsetPosition = OffsetPosition(baseOffset, 0)
+    val lowerOffsetThanBaseOffset = offsetIndex.baseOffset - 1
+    assertEquals(nonExistentOffsetPosition.position, cache.lookupOffset(rlsMetadata, lowerOffsetThanBaseOffset))
+  }
+
+  @Test
+  def testCacheEntryExpiry(): Unit = {
+    val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
+    val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
+    val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+    cache.getIndexEntry(metadataList.head)
+    cache.getIndexEntry(metadataList.head)
+    assertEquals(1, cache.entries.size())
+    verifyFetchIndexInvocation(count = 1)
+
+    cache.getIndexEntry(metadataList.head)
+    cache.getIndexEntry(metadataList(1))
+    assertEquals(2, cache.entries.size())
+    verifyFetchIndexInvocation(count = 2)
+
+    cache.getIndexEntry(metadataList.last)
+    cache.getIndexEntry(metadataList(1))
+    assertEquals(2, cache.entries.size())
+    assertTrue(cache.entries.containsKey(metadataList.last.remoteLogSegmentId().id()))
+    assertTrue(cache.entries.containsKey(metadataList(1).remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(count = 3)
+
+    cache.getIndexEntry(metadataList(1))
+    cache.getIndexEntry(metadataList.head)
+    assertEquals(2, cache.entries.size())
+    assertFalse(cache.entries.containsKey(metadataList.last.remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(count = 4)
+  }
+
+  @Test
+  def testGetIndexAfterCacheClose(): Unit = {
+    val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
+    val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
+    val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+    cache.getIndexEntry(metadataList.head)
+    cache.getIndexEntry(metadataList.head)

Review Comment:
   Why are we calling the same thing a second time? Ditto in two other places below.



##########
core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote

Review Comment:
   Should this be in the storage module like ClassLoaderAwareRemoteLogMetadataManager?



##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -386,11 +397,147 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Long = {
+
+    def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = {
+      val previousEpoch = epoch - 1
+      // Find the end-offset for the epoch earlier to the given epoch from the leader
+      val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+        .setCurrentLeaderEpoch(currentLeaderEpoch)
+        .setLeaderEpoch(previousEpoch))
+      val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+      if (maybeEpochEndOffset.isEmpty) {
+        throw new KafkaException("No response received for partition: " + partition);
+      }
+
+      val epochEndOffset = maybeEpochEndOffset.get
+      if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+        throw Errors.forCode(epochEndOffset.errorCode()).exception()
+      }
+
+      epochEndOffset
+    }
+
+    val log = replicaMgr.localLogOrException(partition)
+    val nextOffset = {
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        if (replicaMgr.remoteLogManager.isEmpty) throw new IllegalStateException("RemoteLogManager is not yet instantiated")
+
+        val rlm = replicaMgr.remoteLogManager.get
+
+        // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+        // until that offset
+        val previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1
+        val targetEpoch: Int = {
+          // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+          // will have the same epoch.
+          if (epochForLeaderLocalLogStartOffset == 0) {
+            epochForLeaderLocalLogStartOffset
+          } else {
+            // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+            val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+            // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+            if (earlierEpochEndOffset.endOffset > previousOffsetToLeaderLocalLogStartOffset) {
+              // Always use the leader epoch from returned earlierEpochEndOffset.
+              // This gives the respective leader epoch, that will handle any gaps in epochs.
+              // For ex, leader epoch cache contains:
+              // leader-epoch   start-offset
+              //  0 		          20
+              //  1 		          85
+              //  <2> - gap no messages were appended in this leader epoch.
+              //  3 		          90
+              //  4 		          98
+              // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+              // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+              // So, for offset 89, we should return leader epoch as 1 like below.
+              earlierEpochEndOffset.leaderEpoch()
+            } else epochForLeaderLocalLogStartOffset
+          }
+        }
+
+        val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
+
+        if (maybeRlsm.isPresent) {
+          val remoteLogSegmentMetadata = maybeRlsm.get()
+          // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start
+          // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+          val nextOffset = remoteLogSegmentMetadata.endOffset() + 1
+          val epochStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH)
+          val epochs = readLeaderEpochCheckpoint(epochStream)
+
+          // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+          truncateFullyAndStartAt(partition, nextOffset)
+
+          log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
+          log.leaderEpochCache.foreach { cache =>
+            epochs.foreach(epochEntry =>
+              // Do not flush to file for each entry.
+              cache.assign(epochEntry.epoch, epochEntry.startOffset, flushToFile = false)
+            )
+            // Flush the cache to the file.
+            cache.flush()
+          }
+
+          debug(s"Updated the epoch cache from remote tier till offset: $leaderLocalLogStartOffset " +
+            s"with size: ${epochs.size} for $partition")
+
+          // Restore producer snapshot
+          val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir, nextOffset)
+          val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp");
+          // Copy it to snapshot file in atomic manner.
+          Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+            tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+          Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath, false)
+
+          // Reload producer snapshots.
+          log.producerStateManager.reloadSnapshots()
+          log.loadProducerState(nextOffset, reloadFromCleanShutdown = false)
+          debug(s"Built the leader epoch cache and producer snapshots from remote tier for $partition. " +
+            s"Active producers: ${log.producerStateManager.activeProducers.size}, " +
+            s"LeaderLogStartOffset: $leaderLogStartOffset, endOffset: $nextOffset")
+
+          // Return the offset from which next fetch should happen.
+          nextOffset
+        } else {
+          throw new RemoteStorageException(s"Couldn't build the state from remote store for partition: $partition, " +
+            s"currentLeaderEpoch: $currentLeaderEpoch, leaderLocalLogStartOffset: $leaderLocalLogStartOffset, " +
+            s"leaderLogStartOffset: $leaderLogStartOffset, epoch: $targetEpoch as the previous remote log segment " +
+            s"metadata was not found")
+        }
+
+      } else {

Review Comment:
   Yes, thinking about this more. I am not sure that truncating all local logs is the right thing to do. If we do that, the replica's log may not give a complete view of the data. We could get into this situation when (1) the topic level remote storage flag propagation is delayed; (2) incorrect configuration by the user (e.g. remoteStorageSystemEnable not enabled on all brokers). In both cases, it seems a better strategy is to error out and keep retrying. In the case (1), the issue will be resolved automatically when the topic level flag is propagated to this broker. In the case (2), this issue will be resolve after the user fixes the configuration.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1280,6 +1308,14 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           case _ => Optional.empty[Integer]()
         }
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
+      } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
+        val earliestLocalLogEpochEntry = leaderEpochCache.flatMap(cache =>
+          cache.epochForOffset(_localLogStartOffset).flatMap(cache.epochEntry))
+        val epochOpt = earliestLocalLogEpochEntry match {
+          case Some(entry) if entry.startOffset <= _localLogStartOffset => Optional.of[Integer](entry.epoch)
+          case _ => Optional.empty[Integer]()
+        }
+        Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, _localLogStartOffset, epochOpt))

Review Comment:
   _localLogStartOffset could change after we find out the epoch. Perhaps we could save it as a local val and use it in the return value.



##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.Logging
+import org.apache.kafka.common._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{Closeable, InputStream}
+import java.security.{AccessController, PrivilegedAction}
+import java.util
+import java.util.Optional
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import scala.collection.Set
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class is responsible for
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances.
+ *  - receives any leader and follower replica events and partition stop events and act on them
+ *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *
+ * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
+ * @param brokerId  id of the current broker.
+ * @param logDir    directory of Kafka log segments.
+ */
+class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+                       brokerId: Int,
+                       logDir: String) extends Logging with Closeable with KafkaMetricsGroup {
+
+  // topic ids received on leadership changes
+  private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]()
+
+  private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager()
+  private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager()
+
+  private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir)
+
+  private var closed = false
+
+  private[remote] def createRemoteStorageManager(): RemoteStorageManager = {
+    def createDelegate(classLoader: ClassLoader): RemoteStorageManager = {
+      classLoader.loadClass(rlmConfig.remoteStorageManagerClassName())
+        .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] {
+      private val classPath = rlmConfig.remoteStorageManagerClassPath()
+
+      override def run(): RemoteStorageManager = {
+          if (classPath != null && classPath.trim.nonEmpty) {
+            val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+            val delegate = createDelegate(classLoader)
+            new ClassLoaderAwareRemoteStorageManager(delegate, classLoader)
+          } else {
+            createDelegate(this.getClass.getClassLoader)
+          }
+      }
+    })
+  }
+
+  private def configureRSM(): Unit = {
+    val rsmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) }
+    rsmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    remoteLogStorageManager.configure(rsmProps)
+  }
+
+  private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = {
+    def createDelegate(classLoader: ClassLoader) = {
+      classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName())
+        .getDeclaredConstructor()
+        .newInstance()
+        .asInstanceOf[RemoteLogMetadataManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] {
+      private val classPath = rlmConfig.remoteLogMetadataManagerClassPath
+
+      override def run(): RemoteLogMetadataManager = {
+        if (classPath != null && classPath.trim.nonEmpty) {
+          val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+          val delegate = createDelegate(classLoader)
+          new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader)
+        } else {
+          createDelegate(this.getClass.getClassLoader)
+        }
+      }
+    })
+  }
+
+  private def configureRLMM(): Unit = {
+    val rlmmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteLogMetadataManagerProps().asScala.foreach { case (k, v) => rlmmProps.put(k, v) }
+    rlmmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    rlmmProps.put(KafkaConfig.LogDirProp, logDir)
+    remoteLogMetadataManager.configure(rlmmProps)
+  }
+
+  def startup(): Unit = {
+    // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources
+    // in connecting to the brokers or remote storages.
+    configureRSM()
+    configureRLMM()
+  }
+
+  def storageManager(): RemoteStorageManager = {
+    remoteLogStorageManager
+  }
+
+  /**
+   * Callback to receive any leadership changes for the topic partitions assigned to this broker. If there are no
+   * existing tasks for a given topic partition then it will assign new leader or follower task else it will convert the
+   * task to respective target state(leader or follower).
+   *
+   * @param partitionsBecomeLeader   partitions that have become leaders on this broker.
+   * @param partitionsBecomeFollower partitions that have become followers on this broker.
+   * @param topicIds                 topic name to topic id mappings.
+   */
+  def onLeadershipChange(partitionsBecomeLeader: Set[Partition],
+                         partitionsBecomeFollower: Set[Partition],
+                         topicIds: util.Map[String, Uuid]): Unit = {
+    debug(s"Received leadership changes for leaders: $partitionsBecomeLeader and followers: $partitionsBecomeFollower")
+
+    // Partitions logs are available when this callback is invoked.
+    // Compact topics and internal topics are filtered here as they are not supported with tiered storage.
+    def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = {
+      // We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that.
+      partitions.filter(partition => partition.log.exists(log => log.remoteLogEnabled()))
+        .map(partition => new TopicIdPartition(topicIds.get(partition.topic), partition.topicPartition))
+    }
+
+    val followerTopicPartitions = filterPartitions(partitionsBecomeFollower)
+    val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader)
+    debug(s"Effective topic partitions after filtering compact and internal topics, leaders: $leaderTopicPartitions " +
+      s"and followers: $followerTopicPartitions")
+
+    if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) {
+      leaderTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId()))
+      followerTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId()))
+
+      remoteLogMetadataManager.onPartitionLeadershipChanges(leaderTopicPartitions.asJava, followerTopicPartitions.asJava)
+    }
+  }
+
+  /**
+   * Deletes the internal topic partition info if delete flag is set as true.
+   *
+   * @param topicPartition topic partition to be stopped.
+   * @param delete         flag to indicate whether the given topic partitions to be deleted or not.
+   */
+  def stopPartitions(topicPartition: TopicPartition, delete: Boolean): Unit = {
+    if (delete) {
+      // Delete from internal datastructures only if it is to be deleted.
+      val topicIdPartition = topicPartitionIds.remove(topicPartition)
+      debug(s"Removed partition: $topicIdPartition from topicPartitionIds")
+    }
+  }
+
+  def fetchRemoteLogSegmentMetadata(topicPartition: TopicPartition,
+                                    epochForOffset: Int,
+                                    offset: Long): Optional[RemoteLogSegmentMetadata] = {
+    val topicId = topicPartitionIds.get(topicPartition)
+
+    if (topicId == null) {
+      throw new KafkaException("No topic id registered for topic partition: " + topicPartition)
+    }
+
+    remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset)
+  }
+
+  private def lookupTimestamp(rlsMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Option[TimestampAndOffset] = {
+    val startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset)
+
+    var remoteSegInputStream: InputStream = null
+    try {
+      // Search forward for the position of the last offset that is greater than or equal to the target offset
+      remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata, startPos)
+      val remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream)
+      var batch: RecordBatch = null
+
+      def nextBatch(): RecordBatch = {
+        batch = remoteLogInputStream.nextBatch()
+        batch
+      }
+
+      while (nextBatch() != null) {
+        if (batch.maxTimestamp >= timestamp && batch.lastOffset >= startingOffset) {
+          batch.iterator.asScala.foreach(record => {
+            if (record.timestamp >= timestamp && record.offset >= startingOffset)
+              return Some(new TimestampAndOffset(record.timestamp, record.offset, maybeLeaderEpoch(batch.partitionLeaderEpoch)))
+          })
+        }
+      }
+      None
+    } finally {
+      Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream")
+    }
+  }
+
+  private def maybeLeaderEpoch(leaderEpoch: Int): Optional[Integer] = {
+    if (leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH)
+      Optional.empty()
+    else
+      Optional.of(leaderEpoch)
+  }
+
+  /**
+   * Search the message offset in the remote storage based on timestamp and offset.
+   *
+   * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
+   *
+   * - If there are no messages in the remote storage, return None
+   * - If all the messages in the remote storage have smaller offsets, return None
+   * - If all the messages in the remote storage have smaller timestamps, return None
+   * - If all the messages in the remote storage have larger timestamps, or no message in the remote storage has a timestamp
+   * the returned offset will be max(the earliest offset in the remote storage, startingOffset) and the timestamp will
+   * be the first message's timestamp.
+   * - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp
+   * is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset.
+   *
+   * @param tp               topic partition in which the offset to be found.
+   * @param timestamp        The timestamp to search for.
+   * @param startingOffset   The starting offset to search.
+   * @param leaderEpochCache LeaderEpochFileCache of the topic partition.
+   * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there
+   *         is no such message.
+   */
+  def findOffsetByTimestamp(tp: TopicPartition,
+                            timestamp: Long,
+                            startingOffset: Long,
+                            leaderEpochCache: LeaderEpochFileCache): Option[TimestampAndOffset] = {
+    val topicId = topicPartitionIds.get(tp)
+    if (topicId == null) {
+      throw new KafkaException("Topic id does not exist for topic partition: " + tp)
+    }
+
+    // Get the respective epoch in which the starting-offset exists.
+    val startingOffsetEpoch = leaderEpochCache.epochForOffset(startingOffset)
+    var maybeEpoch = startingOffsetEpoch;
+    while (maybeEpoch.nonEmpty) {
+      val epoch = maybeEpoch.get
+      remoteLogMetadataManager.listRemoteLogSegments(new TopicIdPartition(topicId, tp), epoch).asScala
+        .foreach(rlsMetadata =>
+          if (rlsMetadata.maxTimestampMs() >= timestamp && rlsMetadata.endOffset() >= startingOffset) {
+            val timestampOffset = lookupTimestamp(rlsMetadata, timestamp, startingOffset)
+            if (timestampOffset.isDefined)
+              return timestampOffset
+          }
+        )
+
+      // Move to the next epoch if not found with the current epoch.
+      maybeEpoch = leaderEpochCache.nextEpoch(epoch)
+    }
+    None

Review Comment:
   If no message has timestamp, we will fall to here. This doesn't seem to implement what the comment says.



##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.Logging
+import org.apache.kafka.common._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{Closeable, InputStream}
+import java.security.{AccessController, PrivilegedAction}
+import java.util
+import java.util.Optional
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import scala.collection.Set
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class is responsible for
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances.
+ *  - receives any leader and follower replica events and partition stop events and act on them
+ *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *
+ * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
+ * @param brokerId  id of the current broker.
+ * @param logDir    directory of Kafka log segments.
+ */
+class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+                       brokerId: Int,
+                       logDir: String) extends Logging with Closeable with KafkaMetricsGroup {
+
+  // topic ids received on leadership changes
+  private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]()
+
+  private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager()
+  private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager()
+
+  private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir)
+
+  private var closed = false
+
+  private[remote] def createRemoteStorageManager(): RemoteStorageManager = {
+    def createDelegate(classLoader: ClassLoader): RemoteStorageManager = {
+      classLoader.loadClass(rlmConfig.remoteStorageManagerClassName())
+        .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] {
+      private val classPath = rlmConfig.remoteStorageManagerClassPath()
+
+      override def run(): RemoteStorageManager = {
+          if (classPath != null && classPath.trim.nonEmpty) {
+            val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+            val delegate = createDelegate(classLoader)
+            new ClassLoaderAwareRemoteStorageManager(delegate, classLoader)
+          } else {
+            createDelegate(this.getClass.getClassLoader)
+          }
+      }
+    })
+  }
+
+  private def configureRSM(): Unit = {
+    val rsmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) }
+    rsmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    remoteLogStorageManager.configure(rsmProps)
+  }
+
+  private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = {
+    def createDelegate(classLoader: ClassLoader) = {
+      classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName())
+        .getDeclaredConstructor()
+        .newInstance()
+        .asInstanceOf[RemoteLogMetadataManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] {
+      private val classPath = rlmConfig.remoteLogMetadataManagerClassPath
+
+      override def run(): RemoteLogMetadataManager = {
+        if (classPath != null && classPath.trim.nonEmpty) {
+          val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+          val delegate = createDelegate(classLoader)
+          new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader)
+        } else {
+          createDelegate(this.getClass.getClassLoader)
+        }
+      }
+    })
+  }
+
+  private def configureRLMM(): Unit = {
+    val rlmmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteLogMetadataManagerProps().asScala.foreach { case (k, v) => rlmmProps.put(k, v) }
+    rlmmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    rlmmProps.put(KafkaConfig.LogDirProp, logDir)
+    remoteLogMetadataManager.configure(rlmmProps)
+  }
+
+  def startup(): Unit = {
+    // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources
+    // in connecting to the brokers or remote storages.
+    configureRSM()
+    configureRLMM()
+  }
+
+  def storageManager(): RemoteStorageManager = {
+    remoteLogStorageManager
+  }
+
+  /**
+   * Callback to receive any leadership changes for the topic partitions assigned to this broker. If there are no
+   * existing tasks for a given topic partition then it will assign new leader or follower task else it will convert the
+   * task to respective target state(leader or follower).
+   *
+   * @param partitionsBecomeLeader   partitions that have become leaders on this broker.
+   * @param partitionsBecomeFollower partitions that have become followers on this broker.
+   * @param topicIds                 topic name to topic id mappings.
+   */
+  def onLeadershipChange(partitionsBecomeLeader: Set[Partition],
+                         partitionsBecomeFollower: Set[Partition],
+                         topicIds: util.Map[String, Uuid]): Unit = {
+    debug(s"Received leadership changes for leaders: $partitionsBecomeLeader and followers: $partitionsBecomeFollower")
+
+    // Partitions logs are available when this callback is invoked.
+    // Compact topics and internal topics are filtered here as they are not supported with tiered storage.
+    def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = {
+      // We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that.
+      partitions.filter(partition => partition.log.exists(log => log.remoteLogEnabled()))
+        .map(partition => new TopicIdPartition(topicIds.get(partition.topic), partition.topicPartition))
+    }
+
+    val followerTopicPartitions = filterPartitions(partitionsBecomeFollower)
+    val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader)
+    debug(s"Effective topic partitions after filtering compact and internal topics, leaders: $leaderTopicPartitions " +
+      s"and followers: $followerTopicPartitions")
+
+    if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) {
+      leaderTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId()))
+      followerTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId()))
+
+      remoteLogMetadataManager.onPartitionLeadershipChanges(leaderTopicPartitions.asJava, followerTopicPartitions.asJava)
+    }
+  }
+
+  /**
+   * Deletes the internal topic partition info if delete flag is set as true.
+   *
+   * @param topicPartition topic partition to be stopped.
+   * @param delete         flag to indicate whether the given topic partitions to be deleted or not.
+   */
+  def stopPartitions(topicPartition: TopicPartition, delete: Boolean): Unit = {
+    if (delete) {
+      // Delete from internal datastructures only if it is to be deleted.
+      val topicIdPartition = topicPartitionIds.remove(topicPartition)
+      debug(s"Removed partition: $topicIdPartition from topicPartitionIds")
+    }
+  }
+
+  def fetchRemoteLogSegmentMetadata(topicPartition: TopicPartition,
+                                    epochForOffset: Int,
+                                    offset: Long): Optional[RemoteLogSegmentMetadata] = {
+    val topicId = topicPartitionIds.get(topicPartition)
+
+    if (topicId == null) {
+      throw new KafkaException("No topic id registered for topic partition: " + topicPartition)
+    }
+
+    remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset)
+  }
+
+  private def lookupTimestamp(rlsMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Option[TimestampAndOffset] = {
+    val startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset)
+
+    var remoteSegInputStream: InputStream = null
+    try {
+      // Search forward for the position of the last offset that is greater than or equal to the target offset
+      remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata, startPos)
+      val remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream)
+      var batch: RecordBatch = null
+
+      def nextBatch(): RecordBatch = {
+        batch = remoteLogInputStream.nextBatch()
+        batch
+      }
+
+      while (nextBatch() != null) {
+        if (batch.maxTimestamp >= timestamp && batch.lastOffset >= startingOffset) {
+          batch.iterator.asScala.foreach(record => {
+            if (record.timestamp >= timestamp && record.offset >= startingOffset)
+              return Some(new TimestampAndOffset(record.timestamp, record.offset, maybeLeaderEpoch(batch.partitionLeaderEpoch)))
+          })
+        }
+      }
+      None
+    } finally {
+      Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream")
+    }
+  }
+
+  private def maybeLeaderEpoch(leaderEpoch: Int): Optional[Integer] = {
+    if (leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH)
+      Optional.empty()
+    else
+      Optional.of(leaderEpoch)
+  }
+
+  /**
+   * Search the message offset in the remote storage based on timestamp and offset.
+   *
+   * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
+   *
+   * - If there are no messages in the remote storage, return None
+   * - If all the messages in the remote storage have smaller offsets, return None
+   * - If all the messages in the remote storage have smaller timestamps, return None
+   * - If all the messages in the remote storage have larger timestamps, or no message in the remote storage has a timestamp
+   * the returned offset will be max(the earliest offset in the remote storage, startingOffset) and the timestamp will
+   * be the first message's timestamp.
+   * - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp
+   * is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset.
+   *
+   * @param tp               topic partition in which the offset to be found.
+   * @param timestamp        The timestamp to search for.
+   * @param startingOffset   The starting offset to search.
+   * @param leaderEpochCache LeaderEpochFileCache of the topic partition.
+   * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there
+   *         is no such message.
+   */
+  def findOffsetByTimestamp(tp: TopicPartition,
+                            timestamp: Long,
+                            startingOffset: Long,
+                            leaderEpochCache: LeaderEpochFileCache): Option[TimestampAndOffset] = {
+    val topicId = topicPartitionIds.get(tp)
+    if (topicId == null) {
+      throw new KafkaException("Topic id does not exist for topic partition: " + tp)
+    }
+
+    // Get the respective epoch in which the starting-offset exists.
+    val startingOffsetEpoch = leaderEpochCache.epochForOffset(startingOffset)

Review Comment:
   Could we just assign to maybeEpoch directly and get rid of startingOffsetEpoch?



##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.Logging
+import org.apache.kafka.common._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{Closeable, InputStream}
+import java.security.{AccessController, PrivilegedAction}
+import java.util
+import java.util.Optional
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import scala.collection.Set
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class is responsible for
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances.
+ *  - receives any leader and follower replica events and partition stop events and act on them
+ *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *
+ * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
+ * @param brokerId  id of the current broker.
+ * @param logDir    directory of Kafka log segments.
+ */
+class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+                       brokerId: Int,
+                       logDir: String) extends Logging with Closeable with KafkaMetricsGroup {
+
+  // topic ids received on leadership changes
+  private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]()
+
+  private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager()
+  private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager()
+
+  private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir)
+
+  private var closed = false
+
+  private[remote] def createRemoteStorageManager(): RemoteStorageManager = {
+    def createDelegate(classLoader: ClassLoader): RemoteStorageManager = {
+      classLoader.loadClass(rlmConfig.remoteStorageManagerClassName())
+        .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] {
+      private val classPath = rlmConfig.remoteStorageManagerClassPath()
+
+      override def run(): RemoteStorageManager = {
+          if (classPath != null && classPath.trim.nonEmpty) {
+            val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+            val delegate = createDelegate(classLoader)
+            new ClassLoaderAwareRemoteStorageManager(delegate, classLoader)
+          } else {
+            createDelegate(this.getClass.getClassLoader)
+          }
+      }
+    })
+  }
+
+  private def configureRSM(): Unit = {
+    val rsmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) }
+    rsmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    remoteLogStorageManager.configure(rsmProps)
+  }
+
+  private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = {
+    def createDelegate(classLoader: ClassLoader) = {
+      classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName())
+        .getDeclaredConstructor()
+        .newInstance()
+        .asInstanceOf[RemoteLogMetadataManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] {
+      private val classPath = rlmConfig.remoteLogMetadataManagerClassPath
+
+      override def run(): RemoteLogMetadataManager = {
+        if (classPath != null && classPath.trim.nonEmpty) {
+          val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+          val delegate = createDelegate(classLoader)
+          new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader)
+        } else {
+          createDelegate(this.getClass.getClassLoader)
+        }
+      }
+    })
+  }
+
+  private def configureRLMM(): Unit = {
+    val rlmmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteLogMetadataManagerProps().asScala.foreach { case (k, v) => rlmmProps.put(k, v) }
+    rlmmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    rlmmProps.put(KafkaConfig.LogDirProp, logDir)
+    remoteLogMetadataManager.configure(rlmmProps)
+  }
+
+  def startup(): Unit = {
+    // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources
+    // in connecting to the brokers or remote storages.
+    configureRSM()
+    configureRLMM()
+  }
+
+  def storageManager(): RemoteStorageManager = {
+    remoteLogStorageManager
+  }
+
+  /**
+   * Callback to receive any leadership changes for the topic partitions assigned to this broker. If there are no
+   * existing tasks for a given topic partition then it will assign new leader or follower task else it will convert the
+   * task to respective target state(leader or follower).
+   *
+   * @param partitionsBecomeLeader   partitions that have become leaders on this broker.
+   * @param partitionsBecomeFollower partitions that have become followers on this broker.
+   * @param topicIds                 topic name to topic id mappings.
+   */
+  def onLeadershipChange(partitionsBecomeLeader: Set[Partition],
+                         partitionsBecomeFollower: Set[Partition],
+                         topicIds: util.Map[String, Uuid]): Unit = {
+    debug(s"Received leadership changes for leaders: $partitionsBecomeLeader and followers: $partitionsBecomeFollower")
+
+    // Partitions logs are available when this callback is invoked.
+    // Compact topics and internal topics are filtered here as they are not supported with tiered storage.
+    def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = {
+      // We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that.
+      partitions.filter(partition => partition.log.exists(log => log.remoteLogEnabled()))
+        .map(partition => new TopicIdPartition(topicIds.get(partition.topic), partition.topicPartition))
+    }
+
+    val followerTopicPartitions = filterPartitions(partitionsBecomeFollower)
+    val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader)
+    debug(s"Effective topic partitions after filtering compact and internal topics, leaders: $leaderTopicPartitions " +
+      s"and followers: $followerTopicPartitions")
+
+    if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) {
+      leaderTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId()))
+      followerTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId()))
+
+      remoteLogMetadataManager.onPartitionLeadershipChanges(leaderTopicPartitions.asJava, followerTopicPartitions.asJava)
+    }
+  }
+
+  /**
+   * Deletes the internal topic partition info if delete flag is set as true.
+   *
+   * @param topicPartition topic partition to be stopped.
+   * @param delete         flag to indicate whether the given topic partitions to be deleted or not.
+   */
+  def stopPartitions(topicPartition: TopicPartition, delete: Boolean): Unit = {
+    if (delete) {
+      // Delete from internal datastructures only if it is to be deleted.
+      val topicIdPartition = topicPartitionIds.remove(topicPartition)
+      debug(s"Removed partition: $topicIdPartition from topicPartitionIds")
+    }
+  }
+
+  def fetchRemoteLogSegmentMetadata(topicPartition: TopicPartition,
+                                    epochForOffset: Int,
+                                    offset: Long): Optional[RemoteLogSegmentMetadata] = {
+    val topicId = topicPartitionIds.get(topicPartition)
+
+    if (topicId == null) {
+      throw new KafkaException("No topic id registered for topic partition: " + topicPartition)
+    }
+
+    remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset)
+  }
+
+  private def lookupTimestamp(rlsMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Option[TimestampAndOffset] = {
+    val startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset)
+
+    var remoteSegInputStream: InputStream = null
+    try {
+      // Search forward for the position of the last offset that is greater than or equal to the target offset

Review Comment:
   target offset => start offset?



##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -192,4 +204,140 @@ class ReplicaFetcherThread(name: String,
     partition.truncateFullyAndStartAt(offset, isFuture = false)
   }
 
+  def buildProducerSnapshotFile(snapshotFile: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata, rlm: RemoteLogManager): Unit = {

Review Comment:
   Could this be private?



##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -192,4 +204,140 @@ class ReplicaFetcherThread(name: String,
     partition.truncateFullyAndStartAt(offset, isFuture = false)
   }
 
+  def buildProducerSnapshotFile(snapshotFile: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata, rlm: RemoteLogManager): Unit = {
+    val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp")
+    // Copy it to snapshot file in atomic manner.
+    Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+      tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+    Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath, false)
+  }
+
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Long = {
+
+    def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = {
+      val previousEpoch = epoch - 1
+      // Find the end-offset for the epoch earlier to the given epoch from the leader
+      val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+        .setCurrentLeaderEpoch(currentLeaderEpoch)
+        .setLeaderEpoch(previousEpoch))
+      val maybeEpochEndOffset = leader.fetchEpochEndOffsets(partitionsWithEpochs).get(partition)
+      if (maybeEpochEndOffset.isEmpty) {
+        throw new KafkaException("No response received for partition: " + partition);
+      }
+
+      val epochEndOffset = maybeEpochEndOffset.get
+      if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+        throw Errors.forCode(epochEndOffset.errorCode()).exception()
+      }
+
+      epochEndOffset
+    }
+
+    val log = replicaMgr.localLogOrException(partition)
+    val nextOffset = {
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        if (replicaMgr.remoteLogManager.isEmpty) throw new IllegalStateException("RemoteLogManager is not yet instantiated")
+
+        val rlm = replicaMgr.remoteLogManager.get
+
+        // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+        // until that offset
+        val previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1
+        val targetEpoch: Int = {
+          // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+          // will have the same epoch.
+          if (epochForLeaderLocalLogStartOffset == 0) {
+            epochForLeaderLocalLogStartOffset
+          } else {
+            // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+            val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+            // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+            if (earlierEpochEndOffset.endOffset > previousOffsetToLeaderLocalLogStartOffset) {
+              // Always use the leader epoch from returned earlierEpochEndOffset.
+              // This gives the respective leader epoch, that will handle any gaps in epochs.
+              // For ex, leader epoch cache contains:
+              // leader-epoch   start-offset
+              //  0 		          20
+              //  1 		          85
+              //  <2> - gap no messages were appended in this leader epoch.
+              //  3 		          90
+              //  4 		          98
+              // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+              // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.

Review Comment:
   Should fetchEarlierEpochEndOffset(2) be fetchEarlierEpochEndOffset(90)?



##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -192,4 +204,140 @@ class ReplicaFetcherThread(name: String,
     partition.truncateFullyAndStartAt(offset, isFuture = false)
   }
 
+  def buildProducerSnapshotFile(snapshotFile: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata, rlm: RemoteLogManager): Unit = {
+    val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp")
+    // Copy it to snapshot file in atomic manner.
+    Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+      tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+    Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath, false)
+  }
+
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Long = {
+
+    def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = {
+      val previousEpoch = epoch - 1
+      // Find the end-offset for the epoch earlier to the given epoch from the leader
+      val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+        .setCurrentLeaderEpoch(currentLeaderEpoch)
+        .setLeaderEpoch(previousEpoch))
+      val maybeEpochEndOffset = leader.fetchEpochEndOffsets(partitionsWithEpochs).get(partition)
+      if (maybeEpochEndOffset.isEmpty) {
+        throw new KafkaException("No response received for partition: " + partition);
+      }
+
+      val epochEndOffset = maybeEpochEndOffset.get
+      if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+        throw Errors.forCode(epochEndOffset.errorCode()).exception()
+      }
+
+      epochEndOffset
+    }
+
+    val log = replicaMgr.localLogOrException(partition)
+    val nextOffset = {
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        if (replicaMgr.remoteLogManager.isEmpty) throw new IllegalStateException("RemoteLogManager is not yet instantiated")
+
+        val rlm = replicaMgr.remoteLogManager.get
+
+        // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+        // until that offset
+        val previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1
+        val targetEpoch: Int = {
+          // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+          // will have the same epoch.
+          if (epochForLeaderLocalLogStartOffset == 0) {
+            epochForLeaderLocalLogStartOffset
+          } else {
+            // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+            val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+            // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+            if (earlierEpochEndOffset.endOffset > previousOffsetToLeaderLocalLogStartOffset) {
+              // Always use the leader epoch from returned earlierEpochEndOffset.
+              // This gives the respective leader epoch, that will handle any gaps in epochs.
+              // For ex, leader epoch cache contains:
+              // leader-epoch   start-offset
+              //  0 		          20
+              //  1 		          85
+              //  <2> - gap no messages were appended in this leader epoch.
+              //  3 		          90
+              //  4 		          98
+              // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+              // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+              // So, for offset 89, we should return leader epoch as 1 like below.
+              earlierEpochEndOffset.leaderEpoch()
+            } else epochForLeaderLocalLogStartOffset
+          }
+        }
+
+        val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
+
+        if (maybeRlsm.isPresent) {
+          val remoteLogSegmentMetadata = maybeRlsm.get()
+          // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start
+          // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+          val nextOffset = remoteLogSegmentMetadata.endOffset() + 1
+
+          // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+          truncateFullyAndStartAt(partition, nextOffset)
+
+          // Build leader epoch cache.
+          log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
+          val epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata)
+          log.leaderEpochCache.foreach { cache =>
+            cache.assign(epochs)
+          }
+
+          debug(s"Updated the epoch cache from remote tier till offset: $leaderLocalLogStartOffset " +
+            s"with size: ${epochs.size} for $partition")
+
+          // Restore producer snapshot
+          val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir, nextOffset)
+          buildProducerSnapshotFile(snapshotFile, remoteLogSegmentMetadata, rlm)
+
+          // Reload producer snapshots.
+          log.producerStateManager.reloadSnapshots()
+          log.loadProducerState(nextOffset)
+          debug(s"Built the leader epoch cache and producer snapshots from remote tier for $partition. " +
+            s"Active producers: ${log.producerStateManager.activeProducers.size}, " +
+            s"LeaderLogStartOffset: $leaderLogStartOffset, endOffset: $nextOffset")

Review Comment:
   Could we use complete sentence? e.g. s"Active producers with size of ${log.producerStateManager.activeProducers.size}, "
   
   s"logStartOffset is $leaderLogStartOffset and logEndOffset is $nextOffset")



##########
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##########
@@ -614,12 +613,103 @@ class AbstractFetcherThreadTest {
     assertEquals(0L, replicaState.highWatermark)
   }
 
+  @Test
+  def testFollowerFetchMovedToTieredStore(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread(new MockLeaderEndPoint)
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 0L, rlmEnabled = true)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 5, leaderEpoch = 5, new SimpleRecord("f".getBytes)),
+      mkBatch(baseOffset = 6, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+      mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+      mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
+
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 8L, rlmEnabled = true)
+    // Overriding the log start offset to zero for mocking the scenario of segment 0-4 moved to remote store.
+    leaderState.logStartOffset = 0
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    assertEquals(3L, replicaState.logEndOffset)
+    val expectedState = if (truncateOnFetch) Option(Fetching) else Option(Truncating)

Review Comment:
   truncateOnFetch is always true?



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetadataManager {

Review Comment:
   Could we add a description of the class?



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.log.{OffsetIndex, OffsetPosition, TimeIndex, UnifiedLog}
+import kafka.utils.MockTime
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
+import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
+import org.apache.kafka.test.TestUtils
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.mockito.ArgumentMatchers
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
+
+import java.io.{File, FileInputStream}
+import java.nio.file.Files
+import java.util.Collections
+import scala.collection.mutable
+
+class RemoteIndexCacheTest {
+
+  val time = new MockTime()
+  val partition = new TopicPartition("foo", 0)
+  val idPartition = new TopicIdPartition(Uuid.randomUuid(), partition)
+  val logDir: File = TestUtils.tempDirectory("kafka-logs")
+  val tpDir: File = new File(logDir, partition.toString)
+  val brokerId = 1
+  val baseOffset = 45L
+  val lastOffset = 75L
+  val segmentSize = 1024
+
+  val rsm: RemoteStorageManager = mock(classOf[RemoteStorageManager])
+  val cache: RemoteIndexCache =  new RemoteIndexCache(remoteStorageManager = rsm, logDir = logDir.toString)
+  val remoteLogSegmentId = new RemoteLogSegmentId(idPartition, Uuid.randomUuid())
+  val rlsMetadata: RemoteLogSegmentMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
+    time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
+
+  @BeforeEach
+  def setup(): Unit = {
+    Files.createDirectory(tpDir.toPath)
+    val txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TxnIndexFileSuffix)
+    txnIdxFile.createNewFile()
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
+        val offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix),
+          metadata.startOffset(), maxIndexSize = maxEntries * 8)
+        val timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix),
+          metadata.startOffset(), maxIndexSize = maxEntries * 12)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdxFile)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
+        }
+      })
+  }
+
+  @AfterEach
+  def cleanup(): Unit = {
+    reset(rsm)
+    cache.entries.forEach((_, v) => v.cleanup())
+    cache.close()
+  }
+
+  @Test
+  def testFetchIndexFromRemoteStorage(): Unit = {
+    val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+    val offsetPosition1 = offsetIndex.entry(1)
+    // this call should have invoked fetchOffsetIndex, fetchTimestampIndex once
+    val resultPosition = cache.lookupOffset(rlsMetadata, offsetPosition1.offset)
+    assertEquals(offsetPosition1.position, resultPosition)
+    verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+
+    // this should not cause fetching index from RemoteStorageManager as it is already fetched earlier
+    reset(rsm)
+    val offsetPosition2 = offsetIndex.entry(2)
+    val resultPosition2 = cache.lookupOffset(rlsMetadata, offsetPosition2.offset)
+    assertEquals(offsetPosition2.position, resultPosition2)
+    assertNotNull(cache.getIndexEntry(rlsMetadata))
+    verifyNoInteractions(rsm)
+  }
+
+  @Test
+  def testPositionForNonExistingIndexFromRemoteStorage(): Unit = {
+    val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+    val lastOffsetPosition = cache.lookupOffset(rlsMetadata, offsetIndex.lastOffset)
+    val greaterOffsetThanLastOffset = offsetIndex.lastOffset + 1
+    assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata, greaterOffsetThanLastOffset))
+
+    // offsetIndex.lookup() returns OffsetPosition(baseOffset, 0) for offsets smaller than least entry in the offset index.
+    val nonExistentOffsetPosition = OffsetPosition(baseOffset, 0)
+    val lowerOffsetThanBaseOffset = offsetIndex.baseOffset - 1
+    assertEquals(nonExistentOffsetPosition.position, cache.lookupOffset(rlsMetadata, lowerOffsetThanBaseOffset))
+  }
+
+  @Test
+  def testCacheEntryExpiry(): Unit = {
+    val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
+    val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
+    val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+    cache.getIndexEntry(metadataList.head)
+    cache.getIndexEntry(metadataList.head)

Review Comment:
   Why are we calling the same thing a second time?



##########
core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala:
##########
@@ -245,4 +253,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   private def sendRequest(leaderId: Int, request: ListOffsetsRequest): ListOffsetsResponse = {
     connectAndReceive[ListOffsetsResponse](request, destination = brokerSocketServer(leaderId))
   }
+
+  def createTopic(numPartitions: Int, replicationFactor: Int): Map[Int, Int] = {

Review Comment:
   Could this be private?



##########
core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala:
##########
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.log.{OffsetIndex, TimeIndex, UnifiedLog}
+import kafka.server.KafkaConfig
+import kafka.server.checkpoints.LeaderEpochCheckpoint
+import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
+import kafka.utils.MockTime
+import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.common.config.AbstractConfig
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
+import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
+import org.apache.kafka.test.TestUtils
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.mockito.Mockito._
+import org.junit.jupiter.api.Assertions._
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.ArgumentMatchers.{any, anyInt, anyLong}
+
+import java.io.{ByteArrayInputStream, File, FileInputStream}
+import java.nio.file.Files
+import java.util.{Optional, Properties}
+import java.util
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class RemoteLogManagerTest {
+
+  val time = new MockTime()
+  val brokerId = 0
+  val logDir: String = TestUtils.tempDirectory("kafka-").toString
+
+  val remoteStorageManager: RemoteStorageManager = mock(classOf[RemoteStorageManager])
+  val remoteLogMetadataManager: RemoteLogMetadataManager = mock(classOf[RemoteLogMetadataManager])
+  var remoteLogManagerConfig: RemoteLogManagerConfig = _
+  var remoteLogManager: RemoteLogManager = _
+
+  val leaderTopicIdPartition =  new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0))
+  val followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0))
+  val topicIds: util.Map[String, Uuid] = Map(
+    leaderTopicIdPartition.topicPartition().topic() -> leaderTopicIdPartition.topicId(),
+    followerTopicIdPartition.topicPartition().topic() -> followerTopicIdPartition.topicId()
+  ).asJava
+
+  val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
+    var epochs: Seq[EpochEntry] = Seq()
+    override def write(epochs: Iterable[EpochEntry]): Unit = this.epochs = epochs.toSeq
+    override def read(): Seq[EpochEntry] = this.epochs
+  }
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val props = new Properties()
+    remoteLogManagerConfig = createRLMConfig(props)
+    remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir) {
+      override private[remote] def createRemoteStorageManager() = remoteStorageManager
+      override private[remote] def createRemoteLogMetadataManager() = remoteLogMetadataManager
+    }
+  }
+
+  @Test
+  def testRemoteLogMetadataManagerWithUserDefinedConfigs(): Unit = {
+    val key = "key"
+    val configPrefix = "config.prefix"
+    val props: Properties = new Properties()
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, configPrefix)
+    props.put(configPrefix + key, "world")
+    props.put("remote.log.metadata.y", "z")
+
+    val metadataMangerConfig = createRLMConfig(props).remoteLogMetadataManagerProps()
+    assertEquals(props.get(configPrefix + key), metadataMangerConfig.get(key))
+    assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y"))
+  }
+
+  @Test
+  def testStartup(): Unit = {
+    remoteLogManager.startup()
+    val capture: ArgumentCaptor[util.Map[String, _]] = ArgumentCaptor.forClass(classOf[util.Map[String, _]])
+    verify(remoteStorageManager, times(1)).configure(capture.capture())
+    assertEquals(brokerId, capture.getValue.get(KafkaConfig.BrokerIdProp))
+
+    verify(remoteLogMetadataManager, times(1)).configure(capture.capture())
+    assertEquals(brokerId, capture.getValue.get(KafkaConfig.BrokerIdProp))
+    assertEquals(logDir, capture.getValue.get(KafkaConfig.LogDirProp))
+  }
+
+  @Test
+  def testGetClassLoaderAwareRemoteStorageManager(): Unit = {
+    val rsmManager: ClassLoaderAwareRemoteStorageManager = mock(classOf[ClassLoaderAwareRemoteStorageManager])
+    val remoteLogManager =
+      new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir) {
+        override private[remote] def createRemoteStorageManager(): ClassLoaderAwareRemoteStorageManager = rsmManager
+      }
+    assertEquals(rsmManager, remoteLogManager.storageManager())
+  }
+
+  @Test
+  def testTopicIdCacheUpdates(): Unit = {
+    def verifyInCache(topicIdPartitions: TopicIdPartition*): Unit = {
+      topicIdPartitions.foreach { topicIdPartition =>
+        assertDoesNotThrow(() =>
+          remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), epochForOffset = 0, offset = 0L))
+      }
+    }
+
+    def verifyNotInCache(topicIdPartitions: TopicIdPartition*): Unit = {
+      topicIdPartitions.foreach { topicIdPartition =>
+        assertThrows(classOf[KafkaException], () =>
+          remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), epochForOffset = 0, offset = 0L))
+      }
+    }
+
+    val mockLeaderPartition = mockPartition(leaderTopicIdPartition)
+    val mockFollowerPartition = mockPartition(followerTopicIdPartition)
+
+    when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(classOf[TopicIdPartition]), anyInt(), anyLong()))
+      .thenReturn(Optional.empty[RemoteLogSegmentMetadata]())
+    verifyNotInCache(followerTopicIdPartition, leaderTopicIdPartition)
+    // Load topicId cache
+    remoteLogManager.onLeadershipChange(Set(mockLeaderPartition), Set(mockFollowerPartition), topicIds)
+    verify(remoteLogMetadataManager, times(1))
+      .onPartitionLeadershipChanges(Set(leaderTopicIdPartition).asJava, Set(followerTopicIdPartition).asJava)
+    verifyInCache(followerTopicIdPartition, leaderTopicIdPartition)
+
+    // Evicts from topicId cache
+    remoteLogManager.stopPartitions(leaderTopicIdPartition.topicPartition(), delete = true)
+    verifyNotInCache(leaderTopicIdPartition)
+    verifyInCache(followerTopicIdPartition)
+
+    // Evicts from topicId cache
+    remoteLogManager.stopPartitions(followerTopicIdPartition.topicPartition(), delete = true)
+    verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition)
+  }
+
+  @Test
+  def testFetchRemoteLogSegmentMetadata(): Unit = {
+    remoteLogManager.onLeadershipChange(
+      Set(mockPartition(leaderTopicIdPartition)), Set(mockPartition(followerTopicIdPartition)), topicIds)
+    remoteLogManager.fetchRemoteLogSegmentMetadata(leaderTopicIdPartition.topicPartition(), 10, 100L)
+    remoteLogManager.fetchRemoteLogSegmentMetadata(followerTopicIdPartition.topicPartition(), 20, 200L)
+
+    verify(remoteLogMetadataManager)
+      .remoteLogSegmentMetadata(ArgumentMatchers.eq(leaderTopicIdPartition), anyInt(), anyLong())
+    verify(remoteLogMetadataManager)
+      .remoteLogSegmentMetadata(ArgumentMatchers.eq(followerTopicIdPartition), anyInt(), anyLong())
+  }
+
+  @Test
+  def testFindOffsetByTimestamp(): Unit = {
+    val tp = leaderTopicIdPartition.topicPartition()
+    val remoteLogSegmentId = new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid())
+    val timestamp = time.milliseconds()
+    val startOffset = 120
+    val targetLeaderEpoch = 10
+
+    val segmentMetadata = mock(classOf[RemoteLogSegmentMetadata])
+    when(segmentMetadata.remoteLogSegmentId()).thenReturn(remoteLogSegmentId)
+    when(segmentMetadata.maxTimestampMs()).thenReturn(timestamp + 2)
+    when(segmentMetadata.startOffset()).thenReturn(startOffset)
+    when(segmentMetadata.endOffset()).thenReturn(startOffset + 2)
+
+    val tpDir: File = new File(logDir, tp.toString)
+    Files.createDirectory(tpDir.toPath)
+    val txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TxnIndexFileSuffix)
+    txnIdxFile.createNewFile()
+    when(remoteStorageManager.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+      .thenAnswer { ans =>
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
+        val offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix),
+          metadata.startOffset(), maxIndexSize = maxEntries * 8)
+        val timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix),
+          metadata.startOffset(), maxIndexSize = maxEntries * 12)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdxFile)
+          case IndexType.LEADER_EPOCH =>
+          case IndexType.PRODUCER_SNAPSHOT =>
+        }
+      }
+
+    when(remoteLogMetadataManager.listRemoteLogSegments(ArgumentMatchers.eq(leaderTopicIdPartition), anyInt()))
+      .thenAnswer(ans => {
+        val leaderEpoch = ans.getArgument[Int](1)
+        if (leaderEpoch == targetLeaderEpoch)
+          List(segmentMetadata).asJava.iterator()
+        else
+          List().asJava.iterator()
+      })
+    when(remoteStorageManager.fetchLogSegment(segmentMetadata, 0))
+      .thenReturn(new ByteArrayInputStream(records(timestamp, startOffset, targetLeaderEpoch).buffer().array()))
+
+    val leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint)
+    leaderEpochFileCache.assign(epoch = 5, startOffset = 99L)
+    leaderEpochFileCache.assign(epoch = targetLeaderEpoch, startOffset = startOffset)
+    leaderEpochFileCache.assign(epoch = 12, startOffset = 500L)
+
+    remoteLogManager.onLeadershipChange(Set(mockPartition(leaderTopicIdPartition)), Set(), topicIds)
+    val actual = remoteLogManager.findOffsetByTimestamp(tp, timestamp, startOffset, leaderEpochFileCache)
+    assertEquals(Some(new TimestampAndOffset(timestamp + 1, startOffset + 1, Optional.of(targetLeaderEpoch))), actual)

Review Comment:
   Hmm, the log segment only has 1 record with timestamp and startOffset. Why do we return timestamp + 1 here?



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3474,6 +3535,57 @@ class UnifiedLogTest {
     // bound by the log end offset
     assertEquals(None, log.maybeUpdateHighWatermark(101L))
   }
+  def testEnableRemoteLogStorageOnCompactedTopics(): Unit = {

Review Comment:
   add a new line above



##########
core/src/main/scala/kafka/log/AbstractIndex.scala:
##########
@@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils
  * @param maxIndexSize The maximum index size in bytes.
  */
 abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1,
-                             val writable: Boolean) extends Closeable {
+                             val writable: Boolean) {

Review Comment:
   Same question here. It seems that it's better to make it clear that AbstractIndex is closable?



##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.Logging
+import org.apache.kafka.common._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{Closeable, InputStream}
+import java.security.{AccessController, PrivilegedAction}
+import java.util
+import java.util.Optional
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import scala.collection.Set
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class is responsible for
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances.
+ *  - receives any leader and follower replica events and partition stop events and act on them
+ *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *
+ * @param rlmConfig
+ * @param brokerId
+ * @param logDir
+ */
+class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+                       brokerId: Int,
+                       logDir: String) extends Logging with Closeable with KafkaMetricsGroup {
+
+  // topic ids received on leadership changes
+  private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]()
+
+  private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager()
+  private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager()
+
+  private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir)
+
+  private var closed = false
+
+  private[remote] def createRemoteStorageManager(): RemoteStorageManager = {
+    def createDelegate(classLoader: ClassLoader): RemoteStorageManager = {
+      classLoader.loadClass(rlmConfig.remoteStorageManagerClassName())
+        .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] {
+      private val classPath = rlmConfig.remoteStorageManagerClassPath()
+
+      override def run(): RemoteStorageManager = {
+          if (classPath != null && classPath.trim.nonEmpty) {
+            val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+            val delegate = createDelegate(classLoader)
+            new ClassLoaderAwareRemoteStorageManager(delegate, classLoader)
+          } else {
+            createDelegate(this.getClass.getClassLoader)
+          }
+      }
+    })
+  }
+
+  private def configureRSM(): Unit = {
+    val rsmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) }
+    rsmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    remoteLogStorageManager.configure(rsmProps)
+  }
+
+  private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = {
+    def createDelegate(classLoader: ClassLoader) = {
+      classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName())
+        .getDeclaredConstructor()
+        .newInstance()
+        .asInstanceOf[RemoteLogMetadataManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] {
+      private val classPath = rlmConfig.remoteLogMetadataManagerClassPath
+
+      override def run(): RemoteLogMetadataManager = {
+        if (classPath != null && classPath.trim.nonEmpty) {
+          val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+          val delegate = createDelegate(classLoader)
+          new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader)
+        } else {
+          createDelegate(this.getClass.getClassLoader)
+        }
+      }
+    })
+  }
+
+  private def configureRLMM(): Unit = {
+    val rlmmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteLogMetadataManagerProps().asScala.foreach { case (k, v) => rlmmProps.put(k, v) }
+    rlmmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    rlmmProps.put(KafkaConfig.LogDirProp, logDir)
+    remoteLogMetadataManager.configure(rlmmProps)
+  }
+
+  def startup(): Unit = {
+    // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources
+    // in connecting to the brokers or remote storages.
+    configureRSM()
+    configureRLMM()
+  }
+
+  def storageManager(): RemoteStorageManager = {
+    remoteLogStorageManager
+  }
+
+  /**
+   * Callback to receive any leadership changes for the topic partitions assigned to this broker. If there are no
+   * existing tasks for a given topic partition then it will assign new leader or follower task else it will convert the
+   * task to respective target state(leader or follower).
+   *
+   * @param partitionsBecomeLeader   partitions that have become leaders on this broker.
+   * @param partitionsBecomeFollower partitions that have become followers on this broker.
+   * @param topicIds                 topic name to topic id mappings.
+   */
+  def onLeadershipChange(partitionsBecomeLeader: Set[Partition],
+                         partitionsBecomeFollower: Set[Partition],
+                         topicIds: util.Map[String, Uuid]): Unit = {
+    debug(s"Received leadership changes for leaders: $partitionsBecomeLeader and followers: $partitionsBecomeFollower")
+
+    // Partitions logs are available when this callback is invoked.
+    // Compact topics and internal topics are filtered here as they are not supported with tiered storage.
+    def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = {
+      // We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that.
+      partitions.filter(partition => partition.log.exists(log => log.remoteLogEnabled()))
+        .map(partition => new TopicIdPartition(topicIds.get(partition.topic), partition.topicPartition))
+    }
+
+    val followerTopicPartitions = filterPartitions(partitionsBecomeFollower)
+    val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader)
+    debug(s"Effective topic partitions after filtering compact and internal topics, leaders: $leaderTopicPartitions " +
+      s"and followers: $followerTopicPartitions")
+
+    if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) {
+      remoteLogMetadataManager.onPartitionLeadershipChanges(leaderTopicPartitions.asJava, followerTopicPartitions.asJava)
+    }
+  }
+
+  /**
+   * Stops partitions for copying segments, building indexes and deletes the partition in remote storage if delete flag
+   * is set as true.
+   *
+   * @param topicPartition topic partition to be stopped.
+   * @param delete         flag to indicate whether the given topic partitions to be deleted or not.
+   */
+  def stopPartitions(topicPartition: TopicPartition, delete: Boolean): Unit = {
+    if (delete) {
+      // Delete from internal datastructures only if it is to be deleted.
+      val topicIdPartition = topicPartitionIds.remove(topicPartition)
+      debug(s"Removed partition: $topicIdPartition from topicPartitionIds")
+    }
+  }
+
+  def fetchRemoteLogSegmentMetadata(topicPartition: TopicPartition,
+                                    epochForOffset: Int,
+                                    offset: Long): Optional[RemoteLogSegmentMetadata] = {
+    val topicId = topicPartitionIds.get(topicPartition)
+
+    if (topicId == null) {
+      throw new KafkaException("No topic id registered for topic partition: " + topicPartition)
+    }
+
+    remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset)
+  }
+
+  private def lookupTimestamp(rlsMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Option[TimestampAndOffset] = {
+    val startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset)
+
+    var remoteSegInputStream: InputStream = null
+    try {
+      // Search forward for the position of the last offset that is greater than or equal to the target offset
+      remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata, startPos)
+      val remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream)
+      var batch: RecordBatch = null
+
+      def nextBatch(): RecordBatch = {
+        batch = remoteLogInputStream.nextBatch()
+        batch
+      }
+
+      while (nextBatch() != null) {
+        if (batch.maxTimestamp >= timestamp && batch.lastOffset >= startingOffset) {
+          batch.iterator.asScala.foreach(record => {
+            if (record.timestamp >= timestamp && record.offset >= startingOffset)
+              return Some(new TimestampAndOffset(record.timestamp, record.offset, maybeLeaderEpoch(batch.partitionLeaderEpoch)))
+          })
+        }
+      }
+      None
+    } finally {
+      Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream")
+    }
+  }
+
+  private def maybeLeaderEpoch(leaderEpoch: Int): Optional[Integer] = {
+    if (leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH)
+      Optional.empty()
+    else
+      Optional.of(leaderEpoch)
+  }
+
+  /**
+   * Search the message offset in the remote storage based on timestamp and offset.
+   *
+   * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
+   *
+   * - If there is no messages in the remote storage, return None
+   * - If all the messages in the remote storage have smaller offsets, return None
+   * - If all the messages in the remote storage have smaller timestamps, return None
+   * - If all the messages in the remote storage have larger timestamps, or no message in the remote storage has a timestamp

Review Comment:
   Do we need to make "all the messages in the remote storage have larger timestamps" a special case here? It seems the last option covers that case already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org