You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/10/08 07:54:16 UTC
[kafka] branch trunk updated: KAFKA-14912:Add a dynamic config for remote index cache size (#14381)
This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1c3eb4395a1 KAFKA-14912:Add a dynamic config for remote index cache size (#14381)
1c3eb4395a1 is described below
commit 1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4
Author: hudeqi <12...@qq.com>
AuthorDate: Sun Oct 8 15:54:09 2023 +0800
KAFKA-14912:Add a dynamic config for remote index cache size (#14381)
Reviewers: Luke Chen <sh...@gmail.com>, Satish Duggana <sa...@apache.org>, Kamal Chandraprakash<ka...@gmail.com>, Divij Vaidya <di...@amazon.com>, Subhrodip Mohanta <he...@subho.xyz>
---
.../java/kafka/log/remote/RemoteLogManager.java | 6 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 52 +++++++++-
.../kafka/log/remote/RemoteIndexCacheTest.scala | 107 ++++++++++++++++-----
.../kafka/server/DynamicBrokerConfigTest.scala | 26 +++++
.../log/remote/storage/RemoteLogManagerConfig.java | 2 +-
.../storage/internals/log/RemoteIndexCache.java | 78 +++++++++++----
6 files changed, 228 insertions(+), 43 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index b25beb2e016..ebbfa9b28d7 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -193,7 +193,7 @@ public class RemoteLogManager implements Closeable {
remoteLogStorageManager = createRemoteStorageManager();
remoteLogMetadataManager = createRemoteLogMetadataManager();
- indexCache = new RemoteIndexCache(1024, remoteLogStorageManager, logDir);
+ indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
@@ -211,6 +211,10 @@ public class RemoteLogManager implements Closeable {
);
}
+ public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
+ indexCache.resizeCacheSize(remoteLogIndexFileCacheSize);
+ }
+
private void removeMetrics() {
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
remoteStorageReaderThreadPool.removeMetrics();
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 610634f8598..eac519b1dec 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -89,7 +89,8 @@ object DynamicBrokerConfig {
Set(KafkaConfig.MetricReporterClassesProp) ++
DynamicListenerConfig.ReconfigurableConfigs ++
SocketServer.ReconfigurableConfigs ++
- ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
+ ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala ++
+ DynamicRemoteLogConfig.ReconfigurableConfigs
private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp, KafkaConfig.MaxConnectionCreationRateProp, KafkaConfig.NumNetworkThreadsProp)
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
@@ -271,6 +272,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
addBrokerReconfigurable(new DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
+ addBrokerReconfigurable(new DynamicRemoteLogConfig(kafkaServer))
}
/**
@@ -1129,3 +1131,51 @@ class DynamicProducerStateManagerConfig(val producerStateManagerConfig: Producer
override def reconfigurableConfigs: Set[String] = ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
}
+
+class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable with Logging {
+ override def reconfigurableConfigs: Set[String] = {
+ DynamicRemoteLogConfig.ReconfigurableConfigs
+ }
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+ newConfig.values.forEach { (k, v) =>
+ if (reconfigurableConfigs.contains(k)) {
+ if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) {
+ val newValue = v.asInstanceOf[Long]
+ val oldValue = getValue(server.config, k)
+ if (newValue != oldValue && newValue <= 0) {
+ val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v"
+ throw new ConfigException(s"$errorMsg, value should be at least 1")
+ }
+ }
+ }
+ }
+ }
+
+ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+ val oldValue = oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+ val newValue = newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+ if (oldValue != newValue) {
+ val remoteLogManager = server.remoteLogManagerOpt
+ if (remoteLogManager.nonEmpty) {
+ remoteLogManager.get.resizeCacheSize(newValue)
+ info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " +
+ s"old value: $oldValue, new value: $newValue")
+ }
+ }
+ }
+
+ private def getValue(config: KafkaConfig, name: String): Long = {
+ name match {
+ case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
+ config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+ case n => throw new IllegalStateException(s"Unexpected dynamic remote log manager config $n")
+ }
+ }
+}
+
+object DynamicRemoteLogConfig {
+ val ReconfigurableConfigs = Set(
+ RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
+ )
+}
diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 93e3f01511f..b9482217062 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -40,6 +40,7 @@ import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import scala.collection.mutable
class RemoteIndexCacheTest {
+ private val defaultRemoteIndexCacheSizeBytes = 1024 * 1024L
private val logger: Logger = LoggerFactory.getLogger(classOf[RemoteIndexCacheTest])
private val time = new MockTime()
private val brokerId = 1
@@ -64,24 +65,9 @@ class RemoteIndexCacheTest {
rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
- cache = new RemoteIndexCache(rsm, tpDir.toString)
+ cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, tpDir.toString)
- when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
- .thenAnswer(ans => {
- val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
- val indexType = ans.getArgument[IndexType](1)
- val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
- val timeIdx = createTimeIndexForSegmentMetadata(metadata)
- val txnIdx = createTxIndexForSegmentMetadata(metadata)
- maybeAppendIndexEntries(offsetIdx, timeIdx)
- indexType match {
- case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
- case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
- case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
- case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
- case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
- }
- })
+ mockRsmFetchIndex(rsm)
}
@AfterEach
@@ -183,9 +169,10 @@ class RemoteIndexCacheTest {
@Test
def testCacheEntryExpiry(): Unit = {
+ val estimateEntryBytesSize = estimateOneEntryBytesSize()
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
- cache = new RemoteIndexCache(2, rsm, tpDir.toString)
+ cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
@@ -230,7 +217,7 @@ class RemoteIndexCacheTest {
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
- cache = new RemoteIndexCache(2, rsm, tpDir.toString)
+ cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, tpDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
@@ -430,9 +417,10 @@ class RemoteIndexCacheTest {
@Test
def testReloadCacheAfterClose(): Unit = {
+ val estimateEntryBytesSize = estimateOneEntryBytesSize()
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
- cache = new RemoteIndexCache(2, rsm, tpDir.toString)
+ cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
@@ -466,7 +454,7 @@ class RemoteIndexCacheTest {
cache.close()
// Reload the cache from the disk and check the cache size is same as earlier
- val reloadedCache = new RemoteIndexCache(2, rsm, tpDir.toString)
+ val reloadedCache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString)
assertEquals(2, reloadedCache.internalCache.asMap().size())
reloadedCache.close()
@@ -524,6 +512,48 @@ class RemoteIndexCacheTest {
}
}
+ @Test
+ def testClearCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+ def getIndexFileFromRemoteCacheDir(suffix: String) = {
+ Files.walk(cache.cacheDir())
+ .filter(Files.isRegularFile(_))
+ .filter(path => path.getFileName.toString.endsWith(suffix))
+ .findAny()
+ }
+
+ val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
+ val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
+
+ assertCacheSize(0)
+ // getIndex for first time will call rsm#fetchIndex
+ val cacheEntry = cache.getIndexEntry(metadataList.head)
+ assertCacheSize(1)
+ assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+ assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+ assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+ cache.resizeCacheSize(1L)
+
+ // wait until entry is marked for deletion
+ TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+ "Failed to mark cache entry for cleanup after resizing cache.")
+ TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+ "Failed to cleanup cache entry after resizing cache.")
+
+ // verify no index files on remote cache dir
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+ s"Offset index file should not be present on disk at ${cache.cacheDir()}")
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+ s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+ s"Time index file should not be present on disk at ${cache.cacheDir()}")
+ TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+ s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}")
+
+ assertTrue(cache.internalCache().estimatedSize() == 0)
+ }
+
@Test
def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
// create Corrupt Offset Index File
@@ -616,6 +646,40 @@ class RemoteIndexCacheTest {
}
}
+ private def estimateOneEntryBytesSize(): Long = {
+ val tp = new TopicPartition("estimate-entry-bytes-size", 0)
+ val tpId = new TopicIdPartition(Uuid.randomUuid(), tp)
+ val tpDir = new File(logDir, tpId.toString)
+ Files.createDirectory(tpDir.toPath)
+ val rsm = mock(classOf[RemoteStorageManager])
+ mockRsmFetchIndex(rsm)
+ val cache = new RemoteIndexCache(2L, rsm, tpDir.toString)
+ val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
+ val entry = cache.getIndexEntry(metadataList.head)
+ val entrySizeInBytes = entry.entrySizeBytes()
+ Utils.closeQuietly(cache, "RemoteIndexCache created for estimating entry size")
+ entrySizeInBytes
+ }
+
+ private def mockRsmFetchIndex(rsm: RemoteStorageManager): Unit = {
+ when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+ .thenAnswer(ans => {
+ val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+ val indexType = ans.getArgument[IndexType](1)
+ val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+ val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+ val txnIdx = createTxIndexForSegmentMetadata(metadata)
+ maybeAppendIndexEntries(offsetIdx, timeIdx)
+ indexType match {
+ case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+ case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+ case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+ case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+ case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
+ }
+ })
+ }
+
private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
val pw = new PrintWriter(remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME), rlsMetadata))
pw.write("Hello, world")
@@ -623,5 +687,4 @@ class RemoteIndexCacheTest {
// but it should be multiple of Offset Index EntrySIZE which is equal to 8.
pw.close()
}
-
}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 17611b97211..861bbfe3611 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletionStage
import java.util.concurrent.atomic.AtomicReference
import kafka.controller.KafkaController
import kafka.log.LogManager
+import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.utils.TestUtils
import kafka.zk.KafkaZkClient
@@ -787,6 +788,31 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}
+ @Test
+ def testUpdateDynamicRemoteLogManagerConfig(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+ origProps.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "2")
+
+ val config = KafkaConfig(origProps)
+ val serverMock = Mockito.mock(classOf[KafkaBroker])
+ val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager]))
+
+ Mockito.when(serverMock.config).thenReturn(config)
+ Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+
+ config.dynamicConfig.initialize(None)
+ config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock))
+
+ val props = new Properties()
+
+ props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "4")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(4L, config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
+ Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4)
+
+ Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
+ }
+
def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,
retentionMs: Long,
logLocalRetentionBytes: Long,
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 1c937230a3f..966d841fb44 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -197,7 +197,7 @@ public final class RemoteLogManagerConfig {
atLeast(0),
LOW,
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
- .defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+ .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
LONG,
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
atLeast(1),
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 2bbe9d76ecf..1961207f787 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -99,6 +99,8 @@ public class RemoteIndexCache implements Closeable {
* concurrent reads in-progress.
*/
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final RemoteStorageManager remoteStorageManager;
+ private final ShutdownableThread cleanerThread;
/**
* Actual cache implementation that this file wraps around.
@@ -111,27 +113,50 @@ public class RemoteIndexCache implements Closeable {
*
* We use {@link Caffeine} cache instead of implementing a thread safe LRU cache on our own.
*/
- private final Cache<Uuid, Entry> internalCache;
- private final RemoteStorageManager remoteStorageManager;
- private final ShutdownableThread cleanerThread;
-
- public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
- this(1024, remoteStorageManager, logDir);
- }
+ private Cache<Uuid, Entry> internalCache;
/**
* Creates RemoteIndexCache with the given configs.
*
- * @param maxSize maximum number of segment index entries to be cached.
+ * @param maxSize maximum bytes size of segment index entries to be cached.
* @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes.
* @param logDir log directory
*/
- public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
+ public RemoteIndexCache(long maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
this.remoteStorageManager = remoteStorageManager;
cacheDir = new File(logDir, DIR_NAME);
- internalCache = Caffeine.newBuilder()
- .maximumSize(maxSize)
+ internalCache = initEmptyCache(maxSize);
+ init();
+
+ // Start cleaner thread that will clean the expired entries.
+ cleanerThread = createCleanerThread();
+ cleanerThread.start();
+ }
+
+ public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
+ lock.writeLock().lock();
+ try {
+ // When resizing the cache, we always start with an empty cache. There are two main reasons:
+ // 1. Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old
+ // cache to the new cache in time when resizing inside.
+ // 2. Since the eviction of the caffeine cache is cleared asynchronously, it is possible that after the entry
+ // in the old cache is filled in the new cache, the old cache will clear the entry, and the data in the two caches
+ // will be inconsistent.
+ internalCache.invalidateAll();
+ log.info("Invalidated all entries in the cache and triggered the cleaning of all index files in the cache dir.");
+ internalCache = initEmptyCache(remoteLogIndexFileCacheSize);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private Cache<Uuid, Entry> initEmptyCache(long maxSize) {
+ return Caffeine.newBuilder()
+ .maximumWeight(maxSize)
+ .weigher((Uuid key, Entry entry) -> {
+ return (int) entry.entrySizeBytes;
+ })
// removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or
// evicted (means removal due to the policy)
.removalListener((Uuid key, Entry entry, RemovalCause cause) -> {
@@ -149,12 +174,6 @@ public class RemoteIndexCache implements Closeable {
log.error("Received entry as null for key {} when the it is removed from the cache.", key);
}
}).build();
-
- init();
-
- // Start cleaner thread that will clean the expired entries.
- cleanerThread = createCleanerThread();
- cleanerThread.start();
}
public Collection<Entry> expiredIndexes() {
@@ -166,6 +185,11 @@ public class RemoteIndexCache implements Closeable {
return internalCache;
}
+ // Visible for testing
+ public Path cacheDir() {
+ return cacheDir.toPath();
+ }
+
public void remove(Uuid key) {
lock.readLock().lock();
try {
@@ -442,7 +466,6 @@ public class RemoteIndexCache implements Closeable {
// Note that internal cache does not require explicit cleaning/closing. We don't want to invalidate or cleanup
// the cache as both would lead to triggering of removal listener.
-
log.info("Close completed for RemoteIndexCache");
} catch (InterruptedException e) {
throw new KafkaException(e);
@@ -468,10 +491,13 @@ public class RemoteIndexCache implements Closeable {
private boolean markedForCleanup = false;
+ private final long entrySizeBytes;
+
public Entry(OffsetIndex offsetIndex, TimeIndex timeIndex, TransactionIndex txnIndex) {
this.offsetIndex = offsetIndex;
this.timeIndex = timeIndex;
this.txnIndex = txnIndex;
+ this.entrySizeBytes = estimatedEntrySize();
}
// Visible for testing
@@ -499,6 +525,22 @@ public class RemoteIndexCache implements Closeable {
return markedForCleanup;
}
+ public long entrySizeBytes() {
+ return entrySizeBytes;
+ }
+
+ private long estimatedEntrySize() {
+ lock.readLock().lock();
+ try {
+ return offsetIndex.sizeInBytes() + timeIndex.sizeInBytes() + Files.size(txnIndex.file().toPath());
+ } catch (IOException e) {
+ log.warn("Error occurred when estimating remote index cache entry bytes size, just set 0 firstly.", e);
+ return 0L;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
public OffsetPosition lookupOffset(long targetOffset) {
lock.readLock().lock();
try {