You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/07/11 18:25:30 UTC

[kafka] branch trunk updated: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module. (#13275)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7e2f8787137 KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module. (#13275)
7e2f8787137 is described below

commit 7e2f87871371d4d88187f73f43090488612ef42a
Author: Satish Duggana <sa...@apache.org>
AuthorDate: Tue Jul 11 23:55:23 2023 +0530

    KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module. (#13275)
    
    KAFKA-14522 Rewrite and Move of RemoteIndexCache to storage module.
    Cleanedup index file suffix usages and other minor cleanups
    
    Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@amazon.com>, Kamal Chandraprakash<ka...@gmail.com>, Jorge Esteban Quilcate Otoya <qu...@gmail.com>
---
 build.gradle                                       |   5 +-
 checkstyle/import-control.xml                      |   1 +
 checkstyle/suppressions.xml                        |   2 +-
 .../java/kafka/log/remote/RemoteLogManager.java    |   4 +-
 core/src/main/scala/kafka/log/LocalLog.scala       |   2 +-
 core/src/main/scala/kafka/log/LogManager.scala     |   6 +-
 .../scala/kafka/log/remote/RemoteIndexCache.scala  | 448 --------------
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   5 +-
 .../kafka/log/remote/RemoteIndexCacheTest.scala    |  59 +-
 .../storage/internals/log/RemoteIndexCache.java    | 671 +++++++++++++++++++++
 10 files changed, 713 insertions(+), 490 deletions(-)

diff --git a/build.gradle b/build.gradle
index eef7aa68abd..afdd4863dcc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -875,7 +875,6 @@ project(':core') {
 
 
     implementation libs.argparse4j
-    implementation libs.caffeine
     implementation libs.commonsValidator
     implementation libs.jacksonDatabind
     implementation libs.jacksonModuleScala
@@ -930,7 +929,8 @@ project(':core') {
     testImplementation(libs.jfreechart) {
       exclude group: 'junit', module: 'junit'
     }
-
+    testImplementation libs.caffeine
+    
     generator project(':generator')
   }
 
@@ -1703,6 +1703,7 @@ project(':storage') {
     implementation project(':storage:api')
     implementation project(':server-common')
     implementation project(':clients')
+    implementation libs.caffeine
     implementation libs.slf4jApi
     implementation libs.jacksonDatabind
 
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index ee8355fef12..67b8ea08d84 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -269,6 +269,7 @@
     <allow pkg="org.apache.kafka.server"/>
     <allow pkg="org.apache.kafka.storage.internals"/>
     <allow pkg="org.apache.kafka.common" />
+    <allow pkg="com.github.benmanes.caffeine.cache" />
   </subpackage>
 
   <subpackage name="shell">
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6e2a8c0ca4e..3f2df5a32b6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -335,7 +335,7 @@
     <suppress checks="CyclomaticComplexity"
               files="(LogValidator|RemoteLogManagerConfig).java"/>
     <suppress checks="NPathComplexity"
-              files="LogValidator.java"/>
+              files="(LogValidator|RemoteIndexCache).java"/>
     <suppress checks="ParameterNumber"
               files="(LogAppendInfo|RemoteLogManagerConfig).java"/>
 
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 7e528a6bd39..6831230287b 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -58,11 +58,13 @@ import org.apache.kafka.storage.internals.log.FetchIsolation;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
 import org.apache.kafka.storage.internals.log.OffsetIndex;
 import org.apache.kafka.storage.internals.log.OffsetPosition;
+import org.apache.kafka.storage.internals.log.RemoteIndexCache;
 import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
 import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
 import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
 import org.apache.kafka.storage.internals.log.TransactionIndex;
 import org.apache.kafka.storage.internals.log.TxnIndexSearchResult;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -160,7 +162,7 @@ public class RemoteLogManager implements Closeable {
                             String logDir,
                             String clusterId,
                             Time time,
-                            Function<TopicPartition, Optional<UnifiedLog>> fetchLog) {
+                            Function<TopicPartition, Optional<UnifiedLog>> fetchLog) throws IOException {
         this.rlmConfig = rlmConfig;
         this.brokerId = brokerId;
         this.logDir = logDir;
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index 34442e3c3f5..b90178d5c5d 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -593,7 +593,7 @@ class LocalLog(@volatile private var _dir: File,
 object LocalLog extends Logging {
 
   /** a file that is scheduled to be deleted */
-  private[log] val DeletedFileSuffix = ".deleted"
+  private[log] val DeletedFileSuffix = LogFileUtils.DELETED_FILE_SUFFIX
 
   /** A temporary file that is being used for log cleaning */
   private[log] val CleanedFileSuffix = ".cleaned"
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 5e20d916310..913661fd4f4 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -17,8 +17,6 @@
 
 package kafka.log
 
-import kafka.log.remote.RemoteIndexCache
-
 import java.io._
 import java.nio.file.Files
 import java.util.concurrent._
@@ -43,7 +41,7 @@ import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache}
 
 import scala.annotation.nowarn
 
@@ -397,7 +395,7 @@ class LogManager(logDirs: Seq[File],
           logDir.isDirectory &&
             // Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem
             // but not any topic-partition dir.
-            !logDir.getName.equals(RemoteIndexCache.DirName) &&
+            !logDir.getName.equals(RemoteIndexCache.DIR_NAME) &&
             UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
         numTotalLogs += logsToLoad.length
         numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
deleted file mode 100644
index 7c035c1a336..00000000000
--- a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
+++ /dev/null
@@ -1,448 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.log.remote
-
-import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause}
-import kafka.log.UnifiedLog
-import kafka.log.remote.RemoteIndexCache.{DirName, offsetFromRemoteIndexFileName, RemoteLogIndexCacheCleanerThread, remoteLogSegmentIdFromRemoteIndexFileName, remoteOffsetIndexFile, remoteTimeIndexFile, remoteTransactionIndexFile}
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.utils.{CoreUtils, Logging, threadsafe}
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.CorruptRecordException
-import org.apache.kafka.common.utils.{Utils, Time}
-import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
-import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}
-import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex}
-import org.apache.kafka.server.util.ShutdownableThread
-
-import java.io.{File, InputStream}
-import java.nio.file.{FileAlreadyExistsException, Files, Path}
-import java.util.concurrent.LinkedBlockingQueue
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
-object RemoteIndexCache {
-  val DirName = "remote-log-index-cache"
-  val TmpFileSuffix = ".tmp"
-  val RemoteLogIndexCacheCleanerThread = "remote-log-index-cleaner"
-
-  def remoteLogSegmentIdFromRemoteIndexFileName(fileName: String): Uuid = {
-    val underscoreIndex = fileName.indexOf("_")
-    val dotIndex = fileName.indexOf(".")
-    Uuid.fromString(fileName.substring(underscoreIndex + 1, dotIndex))
-  }
-
-  def offsetFromRemoteIndexFileName(fileName: String): Long = {
-    fileName.substring(0, fileName.indexOf("_")).toLong
-  }
-
-  /**
-   * Generates prefix for file name for the on-disk representation of remote indexes.
-   *
-   * Example of file name prefix is 45_dsdsd where 45 represents the base offset for the segment and
-   * sdsdsd represents the unique [[RemoteLogSegmentId]]
-   *
-   * @param remoteLogSegmentMetadata remote segment for the remote indexes
-   * @return string which should be used as prefix for on-disk representation of remote indexes
-   */
-  private def generateFileNamePrefixForIndex(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
-    val startOffset = remoteLogSegmentMetadata.startOffset
-    val segmentId = remoteLogSegmentMetadata.remoteLogSegmentId().id
-    // uuid.toString uses URL encoding which is safe for filenames and URLs.
-    s"${startOffset.toString}_${segmentId.toString}"
-  }
-
-  def remoteOffsetIndexFile(dir: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): File = {
-    new File(dir, remoteOffsetIndexFileName(remoteLogSegmentMetadata))
-  }
-
-  def remoteOffsetIndexFileName(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
-    val prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata)
-    prefix + UnifiedLog.IndexFileSuffix
-  }
-
-  def remoteTimeIndexFile(dir: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): File = {
-    new File(dir, remoteTimeIndexFileName(remoteLogSegmentMetadata))
-  }
-
-  def remoteTimeIndexFileName(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
-    val prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata)
-    prefix + UnifiedLog.TimeIndexFileSuffix
-  }
-
-  def remoteTransactionIndexFile(dir: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): File = {
-    new File(dir, remoteTransactionIndexFileName(remoteLogSegmentMetadata))
-  }
-
-  def remoteTransactionIndexFileName(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
-    val prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata)
-    prefix + UnifiedLog.TxnIndexFileSuffix
-  }
-}
-
-@threadsafe
-class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) extends AutoCloseable {
-  // visible for testing
-  private[remote] var markedForCleanup = false
-  // visible for testing
-  private[remote] var cleanStarted = false
-  // This lock is used to synchronize cleanup methods and read methods. This ensures that cleanup (which changes the
-  // underlying files of the index) isn't performed while a read is in-progress for the entry. This is required in
-  // addition to using the thread safe cache because, while the thread safety of the cache ensures that we can read
-  // entries concurrently, it does not ensure that we won't mutate underlying files beloging to an entry.
-  private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
-
-  def lookupOffset(targetOffset: Long): OffsetPosition = {
-    inReadLock(lock) {
-      if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup")
-      offsetIndex.lookup(targetOffset)
-    }
-  }
-
-  def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = {
-    inReadLock(lock) {
-      if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup")
-      val timestampOffset = timeIndex.lookup(timestamp)
-      offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset))
-    }
-  }
-
-  private[remote] def markForCleanup(): Unit = {
-    inWriteLock(lock) {
-      if (!markedForCleanup) {
-        markedForCleanup = true
-        Array(offsetIndex, timeIndex).foreach(index =>
-          index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, "", LogFileUtils.DELETED_FILE_SUFFIX))))
-        // txn index needs to be renamed separately since it's not of type AbstractIndex
-        txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, "",
-          LogFileUtils.DELETED_FILE_SUFFIX)))
-      }
-    }
-  }
-
-  /**
-   * Deletes the index files from the disk. Invoking #close is not required prior to this function.
-   */
-  private[remote] def cleanup(): Unit = {
-    inWriteLock(lock) {
-      markForCleanup()
-      // no-op if clean is done already
-      if (!cleanStarted) {
-        cleanStarted = true
-        CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists()))
-      }
-    }
-  }
-
-  /**
-   * Calls the underlying close method for each index which may lead to releasing resources such as mmap.
-   * This function does not delete the index files.
-   */
-  @Override
-  def close(): Unit = {
-    inWriteLock(lock) {
-      // close is no-op if entry is already marked for cleanup. Mmap resources are released during cleanup.
-      if (!markedForCleanup) {
-        Utils.closeQuietly(offsetIndex, "offset index")
-        Utils.closeQuietly(timeIndex, "time index")
-        Utils.closeQuietly(txnIndex, "transaction index")
-      }
-    }
-  }
-
-  override def toString: String = {
-    s"RemoteIndexCacheEntry(" +
-      s"timeIndex=${timeIndex.file.getName}, " +
-      s"txnIndex=${txnIndex.file.getName}, " +
-      s"offsetIndex=${offsetIndex.file.getName})"
-  }
-}
-
-/**
- * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`.
- * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every
- * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available.
- *
- * The cache contains a garbage collection thread which will delete the files for entries that have been removed from
- * the cache.
- *
- * Note that closing this cache does not delete the index files on disk.
- * Note that the cache eviction policy is based on the default implementation of Caffeine i.e.
- * <a href="https://github.com/ben-manes/caffeine/wiki/Efficiency">Window TinyLfu</a>. TinyLfu relies on a frequency
- * sketch to probabilistically estimate the historic usage of an entry.
- *
- * @param maxSize              maximum number of segment index entries to be cached.
- * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes.
- * @param logDir               log directory
- */
-@threadsafe
-class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageManager, logDir: String)
-  extends Logging with AutoCloseable {
-  /**
-   * Directory where the index files will be stored on disk.
-   */
-  private val cacheDir = new File(logDir, DirName)
-  /**
-   * Represents if the cache is closed or not. Closing the cache is an irreversible operation.
-   */
-  private val isRemoteIndexCacheClosed: AtomicBoolean = new AtomicBoolean(false)
-  /**
-   * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected.
-   *
-   * Visible for testing
-   */
-  private[remote] val expiredIndexes = new LinkedBlockingQueue[Entry]()
-  /**
-   * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other
-   * concurrent reads in-progress.
-   */
-  private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
-  /**
-   * Actual cache implementation that this file wraps around.
-   *
-   * The requirements for this internal cache is as follows:
-   * 1. Multiple threads should be able to read concurrently.
-   * 2. Fetch for missing keys should not block read for available keys.
-   * 3. Only one thread should fetch for a specific key.
-   * 4. Should support LRU-like policy.
-   *
-   * We use [[Caffeine]] cache instead of implementing a thread safe LRU cache on our own.
-   *
-   * Visible for testing.
-   */
-  private[remote] var internalCache: Cache[Uuid, Entry] = Caffeine.newBuilder()
-    .maximumSize(maxSize)
-    // removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or
-    // evicted (means removal due to the policy)
-    .removalListener((key: Uuid, entry: Entry, _: RemovalCause) => {
-      // Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread.
-      entry.markForCleanup()
-      if (!expiredIndexes.offer(entry)) {
-        error(s"Error while inserting entry $entry for key $key into the cleaner queue because queue is full.")
-      }
-    })
-    .build[Uuid, Entry]()
-
-  private def init(): Unit = {
-    val start = Time.SYSTEM.hiResClockMs()
-    try {
-      Files.createDirectory(cacheDir.toPath)
-      info(s"Created new file $cacheDir for RemoteIndexCache")
-    } catch {
-      case _: FileAlreadyExistsException =>
-        info(s"RemoteIndexCache directory $cacheDir already exists. Re-using the same directory.")
-      case e: Exception =>
-        error(s"Unable to create directory $cacheDir for RemoteIndexCache.", e)
-        throw e
-    }
-
-    // Delete any .deleted files remained from the earlier run of the broker.
-    Files.list(cacheDir.toPath).forEach((path: Path) => {
-      if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
-        if (Files.deleteIfExists(path))
-          debug(s"Deleted file $path on cache initialization")
-      }
-    })
-
-    Files.list(cacheDir.toPath).forEach((path:Path) => {
-      val indexFileName = path.getFileName.toString
-      val uuid = remoteLogSegmentIdFromRemoteIndexFileName(indexFileName)
-      // It is safe to update the internalCache non-atomically here since this function is always called by a single
-      // thread only.
-      if (!internalCache.asMap().containsKey(uuid)) {
-        val fileNameWithoutDotExtensions = indexFileName.substring(0, indexFileName.indexOf("."))
-        val offsetIndexFile = new File(cacheDir, fileNameWithoutDotExtensions + UnifiedLog.IndexFileSuffix)
-        val timestampIndexFile = new File(cacheDir, fileNameWithoutDotExtensions + UnifiedLog.TimeIndexFileSuffix)
-        val txnIndexFile = new File(cacheDir, fileNameWithoutDotExtensions + UnifiedLog.TxnIndexFileSuffix)
-
-        // Create entries for each path if all the index files exist.
-        if (Files.exists(offsetIndexFile.toPath) &&
-            Files.exists(timestampIndexFile.toPath) &&
-            Files.exists(txnIndexFile.toPath)) {
-
-          val offset = offsetFromRemoteIndexFileName(indexFileName)
-          val offsetIndex = new OffsetIndex(offsetIndexFile, offset, Int.MaxValue, false)
-          offsetIndex.sanityCheck()
-
-          val timeIndex = new TimeIndex(timestampIndexFile, offset, Int.MaxValue, false)
-          timeIndex.sanityCheck()
-
-          val txnIndex = new TransactionIndex(offset, txnIndexFile)
-          txnIndex.sanityCheck()
-
-          internalCache.put(uuid, new Entry(offsetIndex, timeIndex, txnIndex))
-        } else {
-          // Delete all of them if any one of those indexes is not available for a specific segment id
-          Files.deleteIfExists(offsetIndexFile.toPath)
-          Files.deleteIfExists(timestampIndexFile.toPath)
-          Files.deleteIfExists(txnIndexFile.toPath)
-        }
-      }
-    })
-    info(s"RemoteIndexCache starts up in ${Time.SYSTEM.hiResClockMs() - start} ms.")
-  }
-
-  init()
-
-  // Start cleaner thread that will clean the expired entries
-  private[remote] var cleanerThread: ShutdownableThread = new ShutdownableThread(RemoteLogIndexCacheCleanerThread) {
-    setDaemon(true)
-
-    override def doWork(): Unit = {
-      var expiredEntryOpt: Option[Entry] = None
-      try {
-        expiredEntryOpt = Some(expiredIndexes.take())
-        expiredEntryOpt.foreach( expiredEntry => {
-          log.debug(s"Cleaning up index entry $expiredEntry")
-          expiredEntry.cleanup()
-        })
-      } catch {
-        case ie: InterruptedException =>
-          // cleaner thread should only be interrupted when cache is being closed, else it's an error
-          if (!isRemoteIndexCacheClosed.get()) {
-            log.error("Cleaner thread received interruption but remote index cache is not closed", ie)
-            // propagate the InterruptedException outside to correctly close the thread.
-            throw ie
-          } else {
-            log.debug("Cleaner thread was interrupted on cache shutdown")
-          }
-        // do not exit for exceptions other than InterruptedException
-        case ex: Throwable => log.error(s"Error occurred while cleaning up expired entry $expiredEntryOpt", ex)
-      }
-    }
-  }
-
-  cleanerThread.start()
-
-  def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry = {
-    if (isRemoteIndexCacheClosed.get()) {
-      throw new IllegalStateException(s"Unable to fetch index for " +
-        s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. Index instance is already closed.")
-    }
-
-    inReadLock(lock) {
-      // while this thread was waiting for lock, another thread may have changed the value of isRemoteIndexCacheClosed.
-      // check for index close again
-      if (isRemoteIndexCacheClosed.get()) {
-        throw new IllegalStateException(s"Unable to fetch index for " +
-          s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. Index instance is already closed.")
-      }
-
-      val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id()
-      internalCache.get(cacheKey, (_: Uuid) => {
-        def loadIndexFile[T](indexFile: File,
-                             fetchRemoteIndex: RemoteLogSegmentMetadata => InputStream,
-                             readIndex: File => T): T = {
-          def fetchAndCreateIndex(): T = {
-            val tmpIndexFile = new File(cacheDir, indexFile.getName + RemoteIndexCache.TmpFileSuffix)
-
-            val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata)
-            try {
-              Files.copy(inputStream, tmpIndexFile.toPath)
-            } finally {
-              if (inputStream != null) {
-                inputStream.close()
-              }
-            }
-
-            Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath, false)
-            readIndex(indexFile)
-          }
-
-          if (Files.exists(indexFile.toPath)) {
-            try {
-              readIndex(indexFile)
-            } catch {
-              case ex: CorruptRecordException =>
-                info(s"Error occurred while loading the stored index at ${indexFile.toPath}", ex)
-                fetchAndCreateIndex()
-            }
-          } else {
-            fetchAndCreateIndex()
-          }
-        }
-
-        val startOffset = remoteLogSegmentMetadata.startOffset()
-        val offsetIndexFile = remoteOffsetIndexFile(cacheDir, remoteLogSegmentMetadata)
-        val offsetIndex: OffsetIndex = loadIndexFile(offsetIndexFile,
-          rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET),
-          file => {
-            val index = new OffsetIndex(file, startOffset, Int.MaxValue, false)
-            index.sanityCheck()
-            index
-          })
-
-        val timeIndexFile = remoteTimeIndexFile(cacheDir, remoteLogSegmentMetadata)
-        val timeIndex: TimeIndex = loadIndexFile(timeIndexFile,
-          rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TIMESTAMP),
-          file => {
-            val index = new TimeIndex(file, startOffset, Int.MaxValue, false)
-            index.sanityCheck()
-            index
-          })
-
-        val txnIndexFile = remoteTransactionIndexFile(cacheDir, remoteLogSegmentMetadata)
-        val txnIndex: TransactionIndex = loadIndexFile(txnIndexFile,
-          rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION),
-          file => {
-            val index = new TransactionIndex(startOffset, file)
-            index.sanityCheck()
-            index
-          })
-
-        new Entry(offsetIndex, timeIndex, txnIndex)
-      })
-    }
-  }
-
-  def lookupOffset(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, offset: Long): Int = {
-    inReadLock(lock) {
-      getIndexEntry(remoteLogSegmentMetadata).lookupOffset(offset).position
-    }
-  }
-
-  def lookupTimestamp(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Int = {
-    inReadLock(lock) {
-      getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position
-    }
-  }
-
-  /**
-   * Close should synchronously cleanup the resources used by this cache.
-   * This index is closed when [[RemoteLogManager]] is closed.
-   */
-  def close(): Unit = {
-    // make close idempotent and ensure no more reads allowed from henceforth. The in-progress reads will continue to
-    // completion (release the read lock) and then close will begin executing. Cleaner thread will immediately stop work.
-    if (!isRemoteIndexCacheClosed.getAndSet(true)) {
-      inWriteLock(lock) {
-        info(s"Close initiated for RemoteIndexCache. Cache stats=${internalCache.stats}. " +
-          s"Cache entries pending delete=${expiredIndexes.size()}")
-        // Initiate shutdown for cleaning thread
-        val shutdownRequired = cleanerThread.initiateShutdown
-        // Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk.
-        internalCache.asMap().forEach((_, entry) => entry.close())
-        // wait for cleaner thread to shutdown
-        if (shutdownRequired) cleanerThread.awaitShutdown()
-        // Note that internal cache does not require explicit cleaning / closing. We don't want to invalidate or cleanup
-        // the cache as both would lead to triggering of removal listener.
-        internalCache = null
-        info(s"Close completed for RemoteIndexCache")
-      }
-    }
-  }
-}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index dff07ce547d..e1c593fbca5 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -18,7 +18,6 @@
 package kafka.log
 
 import com.yammer.metrics.core.{Gauge, MetricName}
-import kafka.log.remote.RemoteIndexCache
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
 import kafka.server.BrokerTopicStats
@@ -40,7 +39,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future}
 import java.util.{Collections, Properties}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache}
 
 import scala.collection.{Map, mutable}
 import scala.collection.mutable.ArrayBuffer
@@ -376,7 +375,7 @@ class LogManagerTest {
   @Test
   def testLoadLogsSkipRemoteIndexCache(): Unit = {
     val logDir = TestUtils.tempDir()
-    val remoteIndexCache = new File(logDir, RemoteIndexCache.DirName)
+    val remoteIndexCache = new File(logDir, RemoteIndexCache.DIR_NAME)
     remoteIndexCache.mkdir()
     logManager = createLogManager(Seq(logDir))
     logManager.loadLogs(logConfig, Map.empty)
diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index a39190f9ef3..b087676bed2 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -16,15 +16,14 @@
  */
 package kafka.log.remote
 
-import kafka.log.UnifiedLog
-import kafka.log.remote.RemoteIndexCache.{RemoteLogIndexCacheCleanerThread, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
 import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex}
-import kafka.utils.TestUtils
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
+import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -64,7 +63,7 @@ class RemoteIndexCacheTest {
     rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
       time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
 
-    cache = new RemoteIndexCache(remoteStorageManager = rsm, logDir = tpDir.toString)
+    cache = new RemoteIndexCache(rsm, tpDir.toString)
 
     when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
       .thenAnswer(ans => {
@@ -72,12 +71,12 @@ class RemoteIndexCacheTest {
         val indexType = ans.getArgument[IndexType](1)
         val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
         val timeIdx = createTimeIndexForSegmentMetadata(metadata)
-        val txIdx = createTxIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
         maybeAppendIndexEntries(offsetIdx, timeIdx)
         indexType match {
           case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
           case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
-          case IndexType.TRANSACTION => new FileInputStream(txIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
           case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
           case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
         }
@@ -94,7 +93,7 @@ class RemoteIndexCacheTest {
     Utils.delete(logDir)
     // Verify no lingering threads. It is important to have this as the very last statement in the @aftereach
     // because this may throw an exception and prevent cleanup after it
-    TestUtils.assertNoNonDaemonThreads(RemoteIndexCache.RemoteLogIndexCacheCleanerThread)
+    TestUtils.assertNoNonDaemonThreads(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD)
   }
 
   @Test
@@ -113,11 +112,11 @@ class RemoteIndexCacheTest {
     assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
 
     // assert that parent directory for the index files is correct
-    assertEquals(RemoteIndexCache.DirName, offsetIndexFile.getParent.getFileName.toString,
+    assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString,
       s"offsetIndex=$offsetIndexFile is created under incorrect parent")
-    assertEquals(RemoteIndexCache.DirName, txnIndexFile.getParent.getFileName.toString,
+    assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString,
       s"txnIndex=$txnIndexFile is created under incorrect parent")
-    assertEquals(RemoteIndexCache.DirName, timeIndexFile.getParent.getFileName.toString,
+    assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString,
       s"timeIndex=$timeIndexFile is created under incorrect parent")
   }
 
@@ -156,7 +155,7 @@ class RemoteIndexCacheTest {
   def testCacheEntryExpiry(): Unit = {
     // close existing cache created in test setup before creating a new one
     Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
-    cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = tpDir.toString)
+    cache = new RemoteIndexCache(2, rsm, tpDir.toString)
     val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
     val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
 
@@ -201,7 +200,7 @@ class RemoteIndexCacheTest {
     // close existing cache created in test setup before creating a new one
     Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
 
-    cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = tpDir.toString)
+    cache = new RemoteIndexCache(2, rsm, tpDir.toString)
     val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
     val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
 
@@ -242,9 +241,9 @@ class RemoteIndexCacheTest {
     val cacheEntry = generateSpyCacheEntry()
 
     // verify index files on disk
-    assertTrue(getIndexFileFromDisk(UnifiedLog.IndexFileSuffix).isPresent, s"Offset index file should be present on disk at ${tpDir.toPath}")
-    assertTrue(getIndexFileFromDisk(UnifiedLog.TxnIndexFileSuffix).isPresent, s"Txn index file should be present on disk at ${tpDir.toPath}")
-    assertTrue(getIndexFileFromDisk(UnifiedLog.TimeIndexFileSuffix).isPresent, s"Time index file should be present on disk at ${tpDir.toPath}")
+    assertTrue(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${tpDir.toPath}")
+    assertTrue(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${tpDir.toPath}")
+    assertTrue(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${tpDir.toPath}")
 
     // add the spied entry into the cache, it will overwrite the non-spied entry
     cache.internalCache.put(internalIndexKey, cacheEntry)
@@ -256,9 +255,9 @@ class RemoteIndexCacheTest {
     cache.internalCache.invalidate(internalIndexKey)
 
     // wait until entry is marked for deletion
-    TestUtils.waitUntilTrue(() => cacheEntry.markedForCleanup,
+    TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
       "Failed to mark cache entry for cleanup after invalidation")
-    TestUtils.waitUntilTrue(() => cacheEntry.cleanStarted,
+    TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
       "Failed to cleanup cache entry after invalidation")
 
     // first it will be marked for cleanup, second time markForCleanup is called when cleanup() is called
@@ -272,11 +271,11 @@ class RemoteIndexCacheTest {
     verify(cacheEntry.txnIndex).renameTo(any(classOf[File]))
 
     // verify no index files on disk
-    assertFalse(getIndexFileFromDisk(UnifiedLog.IndexFileSuffix).isPresent,
+    assertFalse(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
       s"Offset index file should not be present on disk at ${tpDir.toPath}")
-    assertFalse(getIndexFileFromDisk(UnifiedLog.TxnIndexFileSuffix).isPresent,
+    assertFalse(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
       s"Txn index file should not be present on disk at ${tpDir.toPath}")
-    assertFalse(getIndexFileFromDisk(UnifiedLog.TimeIndexFileSuffix).isPresent,
+    assertFalse(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
       s"Time index file should not be present on disk at ${tpDir.toPath}")
     assertFalse(getIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
       s"Index file marked for deletion should not be present on disk at ${tpDir.toPath}")
@@ -287,7 +286,7 @@ class RemoteIndexCacheTest {
     // cache is empty at beginning
     assertTrue(cache.internalCache.asMap().isEmpty)
     // verify that cleaner thread is running
-    TestUtils.numThreadsRunning(RemoteLogIndexCacheCleanerThread, isDaemon = true)
+    TestUtils.numThreadsRunning(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, isDaemon = true)
     // create a new entry
     val spyEntry = generateSpyCacheEntry()
     // an exception should not close the cleaner thread
@@ -297,11 +296,11 @@ class RemoteIndexCacheTest {
     // trigger cleanup
     cache.internalCache.invalidate(key)
     // wait for cleanup to start
-    TestUtils.waitUntilTrue(() => spyEntry.cleanStarted, "Failed while waiting for clean up to start")
+    TestUtils.waitUntilTrue(() => spyEntry.isCleanStarted, "Failed while waiting for clean up to start")
     // Give the thread cleaner thread some time to throw an exception
     Thread.sleep(100)
     // Verify that Cleaner thread is still running even when exception is thrown in doWork()
-    var threads = TestUtils.numThreadsRunning(RemoteLogIndexCacheCleanerThread, isDaemon = true)
+    var threads = TestUtils.numThreadsRunning(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, isDaemon = true)
     assertEquals(1, threads.size,
       s"Found unexpected ${threads.size} threads=${threads.map(t => t.getName).mkString(", ")}")
 
@@ -309,7 +308,7 @@ class RemoteIndexCacheTest {
     cache.close()
 
     // verify that the thread is closed properly
-    threads = TestUtils.numThreadsRunning(RemoteLogIndexCacheCleanerThread, isDaemon = true)
+    threads = TestUtils.numThreadsRunning(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, isDaemon = true)
     assertTrue(threads.isEmpty, s"Found unexpected ${threads.size} threads=${threads.map(t => t.getName).mkString(", ")}")
     // if the thread is correctly being shutdown it will not be running
     assertFalse(cache.cleanerThread.isRunning, "Unexpected thread state=running. Check error logs.")
@@ -403,7 +402,7 @@ class RemoteIndexCacheTest {
   def testReloadCacheAfterClose(): Unit = {
     // close existing cache created in test setup before creating a new one
     Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
-    cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = tpDir.toString)
+    cache = new RemoteIndexCache(2, rsm, tpDir.toString)
     val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
     val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
 
@@ -437,21 +436,21 @@ class RemoteIndexCacheTest {
     cache.close()
 
     // Reload the cache from the disk and check the cache size is same as earlier
-    val reloadedCache = new RemoteIndexCache(maxSize = 2, rsm, logDir = tpDir.toString)
+    val reloadedCache = new RemoteIndexCache(2, rsm, tpDir.toString)
     assertEquals(2, reloadedCache.internalCache.asMap().size())
     reloadedCache.close()
 
     verifyNoMoreInteractions(rsm)
   }
 
-  private def generateSpyCacheEntry(): Entry = {
+  private def generateSpyCacheEntry(): RemoteIndexCache.Entry = {
     val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)
     val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
       time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
     val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))
     val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata))
     val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata))
-    spy(new Entry(offsetIndex, timeIndex, txIndex))
+    spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex))
   }
 
   private def assertAtLeastOnePresent(cache: RemoteIndexCache, uuids: Uuid*): Unit = {
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
new file mode 100644
index 00000000000..d730eeee31e
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation of Caffeine i.e.
+ * <a href="https://github.com/ben-manes/caffeine/wiki/Efficiency">Window TinyLfu</a>. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class);
+    private static final String TMP_FILE_SUFFIX = ".tmp";
+
+    public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner";
+    public static final String DIR_NAME = "remote-log-index-cache";
+
+    /**
+     * Directory where the index files will be stored on disk.
+     */
+    private final File cacheDir;
+
+    /**
+     * Represents if the cache is closed or not. Closing the cache is an irreversible operation.
+     */
+    private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false);
+
+    /**
+     * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected.
+     */
+    private final LinkedBlockingQueue<Entry> expiredIndexes = new LinkedBlockingQueue<>();
+
+    /**
+     * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other
+     * concurrent reads in-progress.
+     */
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * Actual cache implementation that this file wraps around.
+     *
+     * The requirements for this internal cache is as follows:
+     * 1. Multiple threads should be able to read concurrently.
+     * 2. Fetch for missing keys should not block read for available keys.
+     * 3. Only one thread should fetch for a specific key.
+     * 4. Should support LRU-like policy.
+     *
+     * We use {@link Caffeine} cache instead of implementing a thread safe LRU cache on our own.
+     */
+    private final Cache<Uuid, Entry> internalCache;
+    private final RemoteStorageManager remoteStorageManager;
+    private final ShutdownableThread cleanerThread;
+
+    public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
+        this(1024, remoteStorageManager, logDir);
+    }
+
+    /**
+     * Creates RemoteIndexCache with the given configs.
+     *
+     * @param maxSize              maximum number of segment index entries to be cached.
+     * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes.
+     * @param logDir               log directory
+     */
+    public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
+        this.remoteStorageManager = remoteStorageManager;
+        cacheDir = new File(logDir, DIR_NAME);
+
+        internalCache = Caffeine.newBuilder()
+                .maximumSize(maxSize)
+                // removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or
+                // evicted (means removal due to the policy)
+                .removalListener((Uuid key, Entry entry, RemovalCause cause) -> {
+                    // Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread.
+                    if (entry != null) {
+                        try {
+                            entry.markForCleanup();
+                        } catch (IOException e) {
+                            throw new KafkaException(e);
+                        }
+                        if (!expiredIndexes.offer(entry)) {
+                            log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key);
+                        }
+                    } else {
+                        log.error("Received entry as null for key {} when the it is removed from the cache.", key);
+                    }
+                }).build();
+
+        init();
+
+        // Start cleaner thread that will clean the expired entries.
+        cleanerThread = createCleanerThread();
+        cleanerThread.start();
+    }
+
+    public Collection<Entry> expiredIndexes() {
+        return Collections.unmodifiableCollection(expiredIndexes);
+    }
+
+    // Visible for testing
+    public Cache<Uuid, Entry> internalCache() {
+        return internalCache;
+    }
+
+    // Visible for testing
+    public ShutdownableThread cleanerThread() {
+        return cleanerThread;
+    }
+
+    private ShutdownableThread createCleanerThread() {
+        ShutdownableThread thread = new ShutdownableThread("remote-log-index-cleaner") {
+            public void doWork() {
+                try {
+                    Entry entry = expiredIndexes.take();
+                    log.debug("Cleaning up index entry {}", entry);
+                    entry.cleanup();
+                } catch (InterruptedException ie) {
+                    // cleaner thread should only be interrupted when cache is being closed, else it's an error
+                    if (!isRemoteIndexCacheClosed.get()) {
+                        log.error("Cleaner thread received interruption but remote index cache is not closed", ie);
+                        // propagate the InterruptedException outside to correctly close the thread.
+                        throw new KafkaException(ie);
+                    } else {
+                        log.debug("Cleaner thread was interrupted on cache shutdown");
+                    }
+                } catch (Exception ex) {
+                    // do not exit for exceptions other than InterruptedException
+                    log.error("Error occurred while cleaning up expired entry", ex);
+                }
+            }
+
+        };
+        thread.setDaemon(true);
+
+        return thread;
+    }
+
+    private void init() throws IOException {
+        long start = Time.SYSTEM.hiResClockMs();
+
+        try {
+            Files.createDirectory(cacheDir.toPath());
+            log.info("Created new file {} for RemoteIndexCache", cacheDir);
+        } catch (FileAlreadyExistsException e) {
+            log.info("RemoteIndexCache directory {} already exists. Re-using the same directory.", cacheDir);
+        } catch (Exception e) {
+            log.error("Unable to create directory {} for RemoteIndexCache.", cacheDir, e);
+            throw new KafkaException(e);
+        }
+
+        // Delete any .deleted or .tmp files remained from the earlier run of the broker.
+        try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
+            paths.forEach(path -> {
+                if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) ||
+                        path.endsWith(TMP_FILE_SUFFIX)) {
+                    try {
+                        if (Files.deleteIfExists(path)) {
+                            log.debug("Deleted file path {} on cache initialization", path);
+                        }
+                    } catch (IOException e) {
+                        throw new KafkaException(e);
+                    }
+                }
+            });
+        }
+
+        try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
+            Iterator<Path> iterator = paths.iterator();
+            while (iterator.hasNext()) {
+                Path path = iterator.next();
+                Path fileNamePath = path.getFileName();
+                if (fileNamePath == null)
+                    throw new KafkaException("Empty file name in remote index cache directory: " + cacheDir);
+
+                String indexFileName = fileNamePath.toString();
+                Uuid uuid = remoteLogSegmentIdFromRemoteIndexFileName(indexFileName);
+
+                // It is safe to update the internalCache non-atomically here since this function is always called by a single
+                // thread only.
+                if (!internalCache.asMap().containsKey(uuid)) {
+                    String fileNameWithoutSuffix = indexFileName.substring(0, indexFileName.indexOf("."));
+                    File offsetIndexFile = new File(cacheDir, fileNameWithoutSuffix + INDEX_FILE_SUFFIX);
+                    File timestampIndexFile = new File(cacheDir, fileNameWithoutSuffix + TIME_INDEX_FILE_SUFFIX);
+                    File txnIndexFile = new File(cacheDir, fileNameWithoutSuffix + TXN_INDEX_FILE_SUFFIX);
+
+                    // Create entries for each path if all the index files exist.
+                    if (Files.exists(offsetIndexFile.toPath()) &&
+                            Files.exists(timestampIndexFile.toPath()) &&
+                            Files.exists(txnIndexFile.toPath())) {
+                        long offset = offsetFromRemoteIndexFileName(indexFileName);
+                        OffsetIndex offsetIndex = new OffsetIndex(offsetIndexFile, offset, Integer.MAX_VALUE, false);
+                        offsetIndex.sanityCheck();
+
+                        TimeIndex timeIndex = new TimeIndex(timestampIndexFile, offset, Integer.MAX_VALUE, false);
+                        timeIndex.sanityCheck();
+
+                        TransactionIndex txnIndex = new TransactionIndex(offset, txnIndexFile);
+                        txnIndex.sanityCheck();
+
+                        Entry entry = new Entry(offsetIndex, timeIndex, txnIndex);
+                        internalCache.put(uuid, entry);
+                    } else {
+                        // Delete all of them if any one of those indexes is not available for a specific segment id
+                        tryAll(Arrays.asList(
+                                () -> {
+                                    Files.deleteIfExists(offsetIndexFile.toPath());
+                                    return null;
+                                },
+                                () -> {
+                                    Files.deleteIfExists(timestampIndexFile.toPath());
+                                    return null;
+                                },
+                                () -> {
+                                    Files.deleteIfExists(txnIndexFile.toPath());
+                                    return null;
+                                }));
+                    }
+                }
+            }
+        }
+        log.info("RemoteIndexCache starts up in {} ms.", Time.SYSTEM.hiResClockMs() - start);
+    }
+
+    private <T> T loadIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                Function<RemoteLogSegmentMetadata, InputStream> fetchRemoteIndex,
+                                Function<File, T> readIndex) throws IOException {
+        File indexFile = new File(cacheDir, file.getName());
+        T index = null;
+        if (Files.exists(indexFile.toPath())) {
+            try {
+                index = readIndex.apply(indexFile);
+            } catch (CorruptRecordException ex) {
+                log.info("Error occurred while loading the stored index file {}", indexFile.getPath(), ex);
+            }
+        }
+
+        if (index == null) {
+            File tmpIndexFile = new File(indexFile.getParentFile(), indexFile.getName() + RemoteIndexCache.TMP_FILE_SUFFIX);
+
+            try (InputStream inputStream = fetchRemoteIndex.apply(remoteLogSegmentMetadata);) {
+                Files.copy(inputStream, tmpIndexFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+            }
+
+            Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false);
+            index = readIndex.apply(indexFile);
+        }
+
+        return index;
+    }
+
+    public Entry getIndexEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        if (isRemoteIndexCacheClosed.get()) throw new IllegalStateException("Unable to fetch index for " +
+                "segment id=" + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Instance is already closed.");
+
+        lock.readLock().lock();
+        try {
+            // while this thread was waiting for lock, another thread may have changed the value of isRemoteIndexCacheClosed.
+            // check for index close again
+            if (isRemoteIndexCacheClosed.get()) {
+                throw new IllegalStateException("Unable to fetch index for segment id="
+                        + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Index instance is already closed.");
+            }
+
+            return internalCache.get(remoteLogSegmentMetadata.remoteLogSegmentId().id(),
+                    (Uuid uuid) -> createCacheEntry(remoteLogSegmentMetadata));
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        long startOffset = remoteLogSegmentMetadata.startOffset();
+
+        try {
+            File offsetIndexFile = remoteOffsetIndexFile(cacheDir, remoteLogSegmentMetadata);
+            OffsetIndex offsetIndex = loadIndexFile(offsetIndexFile, remoteLogSegmentMetadata, rlsMetadata -> {
+                try {
+                    return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET);
+                } catch (RemoteStorageException e) {
+                    throw new KafkaException(e);
+                }
+            }, file -> {
+                try {
+                    OffsetIndex index = new OffsetIndex(file, startOffset, Integer.MAX_VALUE, false);
+                    index.sanityCheck();
+                    return index;
+                } catch (IOException e) {
+                    throw new KafkaException(e);
+                }
+            });
+            File timeIndexFile = remoteTimeIndexFile(cacheDir, remoteLogSegmentMetadata);
+            TimeIndex timeIndex = loadIndexFile(timeIndexFile, remoteLogSegmentMetadata, rlsMetadata -> {
+                try {
+                    return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TIMESTAMP);
+                } catch (RemoteStorageException e) {
+                    throw new KafkaException(e);
+                }
+            }, file -> {
+                try {
+                    TimeIndex index = new TimeIndex(file, startOffset, Integer.MAX_VALUE, false);
+                    index.sanityCheck();
+                    return index;
+                } catch (IOException e) {
+                    throw new KafkaException(e);
+                }
+            });
+            File txnIndexFile = remoteTransactionIndexFile(cacheDir, remoteLogSegmentMetadata);
+            TransactionIndex txnIndex = loadIndexFile(txnIndexFile, remoteLogSegmentMetadata, rlsMetadata -> {
+                try {
+                    return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION);
+                } catch (RemoteStorageException e) {
+                    throw new KafkaException(e);
+                }
+            }, file -> {
+                try {
+                    TransactionIndex index = new TransactionIndex(startOffset, file);
+                    index.sanityCheck();
+                    return index;
+                } catch (IOException e) {
+                    throw new KafkaException(e);
+                }
+            });
+
+            return new Entry(offsetIndex, timeIndex, txnIndex);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public int lookupOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
+        lock.readLock().lock();
+        try {
+            return getIndexEntry(remoteLogSegmentMetadata).lookupOffset(offset).position;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public int lookupTimestamp(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long timestamp, long startingOffset) throws IOException {
+        lock.readLock().lock();
+        try {
+            return getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position;
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void close() {
+        // Make close idempotent and ensure no more reads allowed from henceforth. The in-progress reads will continue to
+        // completion (release the read lock) and then close will begin executing. Cleaner thread will immediately stop work.
+        if (!isRemoteIndexCacheClosed.getAndSet(true)) {
+            lock.writeLock().lock();
+            try {
+                log.info("Close initiated for RemoteIndexCache. Cache stats={}. Cache entries pending delete={}",
+                        internalCache.stats(), expiredIndexes.size());
+                // Initiate shutdown for cleaning thread
+                boolean shutdownRequired = cleanerThread.initiateShutdown();
+                // Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk.
+                internalCache.asMap().forEach((uuid, entry) -> entry.close());
+                // wait for cleaner thread to shutdown
+                if (shutdownRequired) cleanerThread.awaitShutdown();
+
+                // Note that internal cache does not require explicit cleaning/closing. We don't want to invalidate or cleanup
+                // the cache as both would lead to triggering of removal listener.
+
+                log.info("Close completed for RemoteIndexCache");
+            } catch (InterruptedException e) {
+                throw new KafkaException(e);
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+    }
+
+    public static class Entry implements AutoCloseable {
+
+        private final OffsetIndex offsetIndex;
+        private final TimeIndex timeIndex;
+        private final TransactionIndex txnIndex;
+
+        // This lock is used to synchronize cleanup methods and read methods. This ensures that cleanup (which changes the
+        // underlying files of the index) isn't performed while a read is in-progress for the entry. This is required in
+        // addition to using the thread safe cache because, while the thread safety of the cache ensures that we can read
+        // entries concurrently, it does not ensure that we won't mutate underlying files belonging to an entry.
+        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+        private boolean cleanStarted = false;
+
+        private boolean markedForCleanup = false;
+
+        public Entry(OffsetIndex offsetIndex, TimeIndex timeIndex, TransactionIndex txnIndex) {
+            this.offsetIndex = offsetIndex;
+            this.timeIndex = timeIndex;
+            this.txnIndex = txnIndex;
+        }
+
+        // Visible for testing
+        public OffsetIndex offsetIndex() {
+            return offsetIndex;
+        }
+
+        // Visible for testing
+        public TimeIndex timeIndex() {
+            return timeIndex;
+        }
+
+        // Visible for testing
+        public TransactionIndex txnIndex() {
+            return txnIndex;
+        }
+
+        // Visible for testing
+        public boolean isCleanStarted() {
+            return cleanStarted;
+        }
+
+        // Visible for testing
+        public boolean isMarkedForCleanup() {
+            return markedForCleanup;
+        }
+
+        public OffsetPosition lookupOffset(long targetOffset) {
+            lock.readLock().lock();
+            try {
+                if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup");
+                else return offsetIndex.lookup(targetOffset);
+            } finally {
+                lock.readLock().unlock();
+            }
+        }
+
+        public OffsetPosition lookupTimestamp(long timestamp, long startingOffset) throws IOException {
+            lock.readLock().lock();
+            try {
+                if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup");
+
+                TimestampOffset timestampOffset = timeIndex.lookup(timestamp);
+                return offsetIndex.lookup(Math.max(startingOffset, timestampOffset.offset));
+            } finally {
+                lock.readLock().unlock();
+            }
+        }
+
+        public void markForCleanup() throws IOException {
+            lock.writeLock().lock();
+            try {
+                if (!markedForCleanup) {
+                    markedForCleanup = true;
+                    offsetIndex.renameTo(new File(Utils.replaceSuffix(offsetIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
+                    timeIndex.renameTo(new File(Utils.replaceSuffix(timeIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
+                    txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
+                }
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        public void cleanup() throws IOException {
+            lock.writeLock().lock();
+            try {
+                markForCleanup();
+                // no-op if clean is done already
+                if (!cleanStarted) {
+                    cleanStarted = true;
+
+                    List<StorageAction<Void, Exception>> actions = Arrays.asList(() -> {
+                        offsetIndex.deleteIfExists();
+                        return null;
+                    }, () -> {
+                        timeIndex.deleteIfExists();
+                        return null;
+                    }, () -> {
+                        txnIndex.deleteIfExists();
+                        return null;
+                    });
+
+                    tryAll(actions);
+                }
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        @Override
+        public void close() {
+            lock.writeLock().lock();
+            try {
+                Utils.closeQuietly(offsetIndex, "OffsetIndex");
+                Utils.closeQuietly(timeIndex, "TimeIndex");
+                Utils.closeQuietly(txnIndex, "TransactionIndex");
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "Entry{" +
+                    "offsetIndex=" + offsetIndex.file().getName() +
+                    ", timeIndex=" + timeIndex.file().getName() +
+                    ", txnIndex=" + txnIndex.file().getName() +
+                    '}';
+        }
+    }
+
+    /**
+     * Executes each entry in `actions` list even if one or more throws an exception. If any of them throws
+     * an IOException, it will be rethrown and adds all other encountered exceptions as suppressed to that IOException.
+     * Otherwise, it throws KafkaException wrapped with the first exception and the remaining exceptions are added as
+     * suppressed to the KafkaException.
+     *
+     * @param actions actions to be executes
+     * @throws IOException Any IOException encountered while executing those actions.
+     * @throws KafkaException Any other non IOExceptions are wrapped and thrown as KafkaException
+     */
+    private static void tryAll(List<StorageAction<Void, Exception>> actions) throws IOException {
+        IOException ioException = null;
+        List<Exception> exceptions = Collections.emptyList();
+        for (StorageAction<Void, Exception> action : actions) {
+            try {
+                action.execute();
+            } catch (IOException e) {
+                if (ioException == null) {
+                    ioException = e;
+                } else {
+                    if (exceptions.isEmpty()) {
+                        exceptions = new ArrayList<>();
+                    }
+                    exceptions.add(e);
+                }
+            } catch (Exception e) {
+                if (exceptions.isEmpty()) {
+                    exceptions = new ArrayList<>();
+                }
+                exceptions.add(e);
+            }
+        }
+
+        if (ioException != null) {
+            for (Exception exception : exceptions) {
+                ioException.addSuppressed(exception);
+            }
+            throw ioException;
+        } else if (!exceptions.isEmpty()) {
+            Iterator<Exception> iterator = exceptions.iterator();
+            KafkaException kafkaException = new KafkaException(iterator.next());
+            while (iterator.hasNext()) {
+                kafkaException.addSuppressed(iterator.next());
+            }
+
+            throw kafkaException;
+        }
+    }
+
+    private static Uuid remoteLogSegmentIdFromRemoteIndexFileName(String fileName) {
+        int underscoreIndex = fileName.indexOf("_");
+        int dotIndex = fileName.indexOf(".");
+        return Uuid.fromString(fileName.substring(underscoreIndex + 1, dotIndex));
+    }
+
+    private static long offsetFromRemoteIndexFileName(String fileName) {
+        return Long.parseLong(fileName.substring(0, fileName.indexOf("_")));
+    }
+
+    /**
+     * Generates prefix for file name for the on-disk representation of remote indexes.
+     * <p>
+     * Example of file name prefix is 45_fooid where 45 represents the base offset for the segment and
+     * fooid represents the unique {@link org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId}
+     *
+     * @param remoteLogSegmentMetadata remote segment for the remote indexes
+     * @return string which should be used as prefix for on-disk representation of remote indexes
+     */
+    private static String generateFileNamePrefixForIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        long startOffset = remoteLogSegmentMetadata.startOffset();
+        Uuid segmentId = remoteLogSegmentMetadata.remoteLogSegmentId().id();
+        // uuid.toString uses URL encoding which is safe for filenames and URLs.
+        return startOffset + "_" + segmentId.toString();
+    }
+
+    public static File remoteOffsetIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        return new File(dir, remoteOffsetIndexFileName(remoteLogSegmentMetadata));
+    }
+
+    public static String remoteOffsetIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        String prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata);
+        return prefix + LogFileUtils.INDEX_FILE_SUFFIX;
+    }
+
+    public static File remoteTimeIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        return new File(dir, remoteTimeIndexFileName(remoteLogSegmentMetadata));
+    }
+
+    public static String remoteTimeIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + TIME_INDEX_FILE_SUFFIX;
+    }
+
+    public static File remoteTransactionIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        return new File(dir, remoteTransactionIndexFileName(remoteLogSegmentMetadata));
+    }
+
+    public static String remoteTransactionIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+    }
+
+}
\ No newline at end of file