You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/10/08 07:54:16 UTC

[kafka] branch trunk updated: KAFKA-14912:Add a dynamic config for remote index cache size (#14381)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1c3eb4395a1 KAFKA-14912:Add a dynamic config for remote index cache size (#14381)
1c3eb4395a1 is described below

commit 1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4
Author: hudeqi <12...@qq.com>
AuthorDate: Sun Oct 8 15:54:09 2023 +0800

    KAFKA-14912:Add a dynamic config for remote index cache size (#14381)
    
    Reviewers: Luke Chen <sh...@gmail.com>, Satish Duggana <sa...@apache.org>, Kamal Chandraprakash<ka...@gmail.com>, Divij Vaidya <di...@amazon.com>, Subhrodip Mohanta <he...@subho.xyz>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |   6 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  52 +++++++++-
 .../kafka/log/remote/RemoteIndexCacheTest.scala    | 107 ++++++++++++++++-----
 .../kafka/server/DynamicBrokerConfigTest.scala     |  26 +++++
 .../log/remote/storage/RemoteLogManagerConfig.java |   2 +-
 .../storage/internals/log/RemoteIndexCache.java    |  78 +++++++++++----
 6 files changed, 228 insertions(+), 43 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index b25beb2e016..ebbfa9b28d7 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -193,7 +193,7 @@ public class RemoteLogManager implements Closeable {
 
         remoteLogStorageManager = createRemoteStorageManager();
         remoteLogMetadataManager = createRemoteLogMetadataManager();
-        indexCache = new RemoteIndexCache(1024, remoteLogStorageManager, logDir);
+        indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
         rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
 
@@ -211,6 +211,10 @@ public class RemoteLogManager implements Closeable {
         );
     }
 
+    public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
+        indexCache.resizeCacheSize(remoteLogIndexFileCacheSize);
+    }
+
     private void removeMetrics() {
         metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
         remoteStorageReaderThreadPool.removeMetrics();
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 610634f8598..eac519b1dec 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -89,7 +89,8 @@ object DynamicBrokerConfig {
     Set(KafkaConfig.MetricReporterClassesProp) ++
     DynamicListenerConfig.ReconfigurableConfigs ++
     SocketServer.ReconfigurableConfigs ++
-    ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
+    ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala ++
+    DynamicRemoteLogConfig.ReconfigurableConfigs
 
   private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp, KafkaConfig.MaxConnectionCreationRateProp, KafkaConfig.NumNetworkThreadsProp)
   private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
@@ -271,6 +272,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
     addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
     addBrokerReconfigurable(kafkaServer.socketServer)
     addBrokerReconfigurable(new DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
+    addBrokerReconfigurable(new DynamicRemoteLogConfig(kafkaServer))
   }
 
   /**
@@ -1129,3 +1131,51 @@ class DynamicProducerStateManagerConfig(val producerStateManagerConfig: Producer
   override def reconfigurableConfigs: Set[String] = ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
 
 }
+
+class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable with Logging {
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicRemoteLogConfig.ReconfigurableConfigs
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    newConfig.values.forEach { (k, v) =>
+      if (reconfigurableConfigs.contains(k)) {
+        if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) {
+          val newValue = v.asInstanceOf[Long]
+          val oldValue = getValue(server.config, k)
+          if (newValue != oldValue && newValue <= 0) {
+            val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v"
+            throw new ConfigException(s"$errorMsg, value should be at least 1")
+          }
+        }
+      }
+    }
+  }
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+    val oldValue = oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+    val newValue = newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+    if (oldValue != newValue) {
+      val remoteLogManager = server.remoteLogManagerOpt
+      if (remoteLogManager.nonEmpty) {
+        remoteLogManager.get.resizeCacheSize(newValue)
+        info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " +
+          s"old value: $oldValue, new value: $newValue")
+      }
+    }
+  }
+
+  private def getValue(config: KafkaConfig, name: String): Long = {
+    name match {
+      case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
+        config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+      case n => throw new IllegalStateException(s"Unexpected dynamic remote log manager config $n")
+    }
+  }
+}
+
+object DynamicRemoteLogConfig {
+  val ReconfigurableConfigs = Set(
+    RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
+  )
+}
diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 93e3f01511f..b9482217062 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -40,6 +40,7 @@ import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import scala.collection.mutable
 
 class RemoteIndexCacheTest {
+  private val defaultRemoteIndexCacheSizeBytes = 1024 * 1024L
   private val logger: Logger = LoggerFactory.getLogger(classOf[RemoteIndexCacheTest])
   private val time = new MockTime()
   private val brokerId = 1
@@ -64,24 +65,9 @@ class RemoteIndexCacheTest {
     rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
       time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
 
-    cache = new RemoteIndexCache(rsm, tpDir.toString)
+    cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, tpDir.toString)
 
-    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
-      .thenAnswer(ans => {
-        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
-        val indexType = ans.getArgument[IndexType](1)
-        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
-        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
-        val txnIdx = createTxIndexForSegmentMetadata(metadata)
-        maybeAppendIndexEntries(offsetIdx, timeIdx)
-        indexType match {
-          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
-          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
-          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
-          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
-          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
-        }
-      })
+    mockRsmFetchIndex(rsm)
   }
 
   @AfterEach
@@ -183,9 +169,10 @@ class RemoteIndexCacheTest {
 
   @Test
   def testCacheEntryExpiry(): Unit = {
+    val estimateEntryBytesSize = estimateOneEntryBytesSize()
     // close existing cache created in test setup before creating a new one
     Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
-    cache = new RemoteIndexCache(2, rsm, tpDir.toString)
+    cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
     val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
     val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
 
@@ -230,7 +217,7 @@ class RemoteIndexCacheTest {
     // close existing cache created in test setup before creating a new one
     Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
 
-    cache = new RemoteIndexCache(2, rsm, tpDir.toString)
+    cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, tpDir.toString)
     val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
     val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
 
@@ -430,9 +417,10 @@ class RemoteIndexCacheTest {
 
   @Test
   def testReloadCacheAfterClose(): Unit = {
+    val estimateEntryBytesSize = estimateOneEntryBytesSize()
     // close existing cache created in test setup before creating a new one
     Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
-    cache = new RemoteIndexCache(2, rsm, tpDir.toString)
+    cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
     val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
     val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
 
@@ -466,7 +454,7 @@ class RemoteIndexCacheTest {
     cache.close()
 
     // Reload the cache from the disk and check the cache size is same as earlier
-    val reloadedCache = new RemoteIndexCache(2, rsm, tpDir.toString)
+    val reloadedCache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
     assertEquals(2, reloadedCache.internalCache.asMap().size())
     reloadedCache.close()
 
@@ -524,6 +512,48 @@ class RemoteIndexCacheTest {
     }
   }
 
+  @Test
+  def testClearCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+    def getIndexFileFromRemoteCacheDir(suffix: String) = {
+      Files.walk(cache.cacheDir())
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .findAny()
+    }
+
+    val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
+    val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
+
+    assertCacheSize(0)
+    // getIndex for first time will call rsm#fetchIndex
+    val cacheEntry = cache.getIndexEntry(metadataList.head)
+    assertCacheSize(1)
+    assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+    assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+    assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+    cache.resizeCacheSize(1L)
+
+    // wait until entry is marked for deletion
+    TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+      "Failed to mark cache entry for cleanup after resizing cache.")
+    TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+      "Failed to cleanup cache entry after resizing cache.")
+
+    // verify no index files on remote cache dir
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+      s"Offset index file should not be present on disk at ${cache.cacheDir()}")
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+      s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+      s"Time index file should not be present on disk at ${cache.cacheDir()}")
+    TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+      s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}")
+
+    assertTrue(cache.internalCache().estimatedSize() == 0)
+  }
+
   @Test
   def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
     // create Corrupt Offset Index File
@@ -616,6 +646,40 @@ class RemoteIndexCacheTest {
     }
   }
 
+  private def estimateOneEntryBytesSize(): Long = {
+    val tp = new TopicPartition("estimate-entry-bytes-size", 0)
+    val tpId = new TopicIdPartition(Uuid.randomUuid(), tp)
+    val tpDir = new File(logDir, tpId.toString)
+    Files.createDirectory(tpDir.toPath)
+    val rsm = mock(classOf[RemoteStorageManager])
+    mockRsmFetchIndex(rsm)
+    val cache = new RemoteIndexCache(2L, rsm, tpDir.toString)
+    val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
+    val entry = cache.getIndexEntry(metadataList.head)
+    val entrySizeInBytes = entry.entrySizeBytes()
+    Utils.closeQuietly(cache, "RemoteIndexCache created for estimating entry size")
+    entrySizeInBytes
+  }
+
+  private def mockRsmFetchIndex(rsm: RemoteStorageManager): Unit = {
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
+        }
+      })
+  }
+
   private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
     val pw =  new PrintWriter(remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME), rlsMetadata))
     pw.write("Hello, world")
@@ -623,5 +687,4 @@ class RemoteIndexCacheTest {
     // but it should be multiple of Offset Index EntrySIZE which is equal to 8.
     pw.close()
   }
-
 }
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 17611b97211..861bbfe3611 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletionStage
 import java.util.concurrent.atomic.AtomicReference
 import kafka.controller.KafkaController
 import kafka.log.LogManager
+import kafka.log.remote.RemoteLogManager
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.utils.TestUtils
 import kafka.zk.KafkaZkClient
@@ -787,6 +788,31 @@ class DynamicBrokerConfigTest {
     verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
   }
 
+  @Test
+  def testUpdateDynamicRemoteLogManagerConfig(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    origProps.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "2")
+
+    val config = KafkaConfig(origProps)
+    val serverMock = Mockito.mock(classOf[KafkaBroker])
+    val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager]))
+
+    Mockito.when(serverMock.config).thenReturn(config)
+    Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+
+    config.dynamicConfig.initialize(None)
+    config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
+
+    val props = new Properties()
+
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "4")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(4L, config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
+    Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4)
+
+    Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
+  }
+
   def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,
                                             retentionMs: Long,
                                             logLocalRetentionBytes: Long,
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 1c937230a3f..966d841fb44 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -197,7 +197,7 @@ public final class RemoteLogManagerConfig {
                                   atLeast(0),
                                   LOW,
                                   REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
-                  .defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+                  .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
                                   LONG,
                                   DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
                                   atLeast(1),
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 2bbe9d76ecf..1961207f787 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -99,6 +99,8 @@ public class RemoteIndexCache implements Closeable {
      * concurrent reads in-progress.
      */
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final RemoteStorageManager remoteStorageManager;
+    private final ShutdownableThread cleanerThread;
 
     /**
      * Actual cache implementation that this file wraps around.
@@ -111,27 +113,50 @@ public class RemoteIndexCache implements Closeable {
      *
      * We use {@link Caffeine} cache instead of implementing a thread safe LRU cache on our own.
      */
-    private final Cache<Uuid, Entry> internalCache;
-    private final RemoteStorageManager remoteStorageManager;
-    private final ShutdownableThread cleanerThread;
-
-    public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
-        this(1024, remoteStorageManager, logDir);
-    }
+    private Cache<Uuid, Entry> internalCache;
 
     /**
      * Creates RemoteIndexCache with the given configs.
      *
-     * @param maxSize              maximum number of segment index entries to be cached.
+     * @param maxSize              maximum bytes size of segment index entries to be cached.
      * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes.
      * @param logDir               log directory
      */
-    public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
+    public RemoteIndexCache(long maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
         this.remoteStorageManager = remoteStorageManager;
         cacheDir = new File(logDir, DIR_NAME);
 
-        internalCache = Caffeine.newBuilder()
-                .maximumSize(maxSize)
+        internalCache = initEmptyCache(maxSize);
+        init();
+
+        // Start cleaner thread that will clean the expired entries.
+        cleanerThread = createCleanerThread();
+        cleanerThread.start();
+    }
+
+    public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
+        lock.writeLock().lock();
+        try {
+            // When resizing the cache, we always start with an empty cache. There are two main reasons:
+            // 1. Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old
+            // cache to the new cache in time when resizing inside.
+            // 2. Since the eviction of the caffeine cache is cleared asynchronously, it is possible that after the entry
+            // in the old cache is filled in the new cache, the old cache will clear the entry, and the data in the two caches
+            // will be inconsistent.
+            internalCache.invalidateAll();
+            log.info("Invalidated all entries in the cache and triggered the cleaning of all index files in the cache dir.");
+            internalCache = initEmptyCache(remoteLogIndexFileCacheSize);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private Cache<Uuid, Entry> initEmptyCache(long maxSize) {
+        return Caffeine.newBuilder()
+                .maximumWeight(maxSize)
+                .weigher((Uuid key, Entry entry) -> {
+                    return (int) entry.entrySizeBytes;
+                })
                 // removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or
                 // evicted (means removal due to the policy)
                 .removalListener((Uuid key, Entry entry, RemovalCause cause) -> {
@@ -149,12 +174,6 @@ public class RemoteIndexCache implements Closeable {
                         log.error("Received entry as null for key {} when the it is removed from the cache.", key);
                     }
                 }).build();
-
-        init();
-
-        // Start cleaner thread that will clean the expired entries.
-        cleanerThread = createCleanerThread();
-        cleanerThread.start();
     }
 
     public Collection<Entry> expiredIndexes() {
@@ -166,6 +185,11 @@ public class RemoteIndexCache implements Closeable {
         return internalCache;
     }
 
+    // Visible for testing
+    public Path cacheDir() {
+        return cacheDir.toPath();
+    }
+
     public void remove(Uuid key) {
         lock.readLock().lock();
         try {
@@ -442,7 +466,6 @@ public class RemoteIndexCache implements Closeable {
 
                 // Note that internal cache does not require explicit cleaning/closing. We don't want to invalidate or cleanup
                 // the cache as both would lead to triggering of removal listener.
-
                 log.info("Close completed for RemoteIndexCache");
             } catch (InterruptedException e) {
                 throw new KafkaException(e);
@@ -468,10 +491,13 @@ public class RemoteIndexCache implements Closeable {
 
         private boolean markedForCleanup = false;
 
+        private final long entrySizeBytes;
+
         public Entry(OffsetIndex offsetIndex, TimeIndex timeIndex, TransactionIndex txnIndex) {
             this.offsetIndex = offsetIndex;
             this.timeIndex = timeIndex;
             this.txnIndex = txnIndex;
+            this.entrySizeBytes = estimatedEntrySize();
         }
 
         // Visible for testing
@@ -499,6 +525,22 @@ public class RemoteIndexCache implements Closeable {
             return markedForCleanup;
         }
 
+        public long entrySizeBytes() {
+            return entrySizeBytes;
+        }
+
+        private long estimatedEntrySize() {
+            lock.readLock().lock();
+            try {
+                return offsetIndex.sizeInBytes() + timeIndex.sizeInBytes() + Files.size(txnIndex.file().toPath());
+            } catch (IOException e) {
+                log.warn("Error occurred when estimating remote index cache entry bytes size, just set 0 firstly.", e);
+                return 0L;
+            } finally {
+                lock.readLock().unlock();
+            }
+        }
+
         public OffsetPosition lookupOffset(long targetOffset) {
             lock.readLock().lock();
             try {