You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/07/11 18:25:30 UTC
[kafka] branch trunk updated: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module. (#13275)
This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7e2f8787137 KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module. (#13275)
7e2f8787137 is described below
commit 7e2f87871371d4d88187f73f43090488612ef42a
Author: Satish Duggana <sa...@apache.org>
AuthorDate: Tue Jul 11 23:55:23 2023 +0530
KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module. (#13275)
KAFKA-14522 Rewrite and Move of RemoteIndexCache to storage module.
Cleanedup index file suffix usages and other minor cleanups
Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@amazon.com>, Kamal Chandraprakash<ka...@gmail.com>, Jorge Esteban Quilcate Otoya <qu...@gmail.com>
---
build.gradle | 5 +-
checkstyle/import-control.xml | 1 +
checkstyle/suppressions.xml | 2 +-
.../java/kafka/log/remote/RemoteLogManager.java | 4 +-
core/src/main/scala/kafka/log/LocalLog.scala | 2 +-
core/src/main/scala/kafka/log/LogManager.scala | 6 +-
.../scala/kafka/log/remote/RemoteIndexCache.scala | 448 --------------
.../test/scala/unit/kafka/log/LogManagerTest.scala | 5 +-
.../kafka/log/remote/RemoteIndexCacheTest.scala | 59 +-
.../storage/internals/log/RemoteIndexCache.java | 671 +++++++++++++++++++++
10 files changed, 713 insertions(+), 490 deletions(-)
diff --git a/build.gradle b/build.gradle
index eef7aa68abd..afdd4863dcc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -875,7 +875,6 @@ project(':core') {
implementation libs.argparse4j
- implementation libs.caffeine
implementation libs.commonsValidator
implementation libs.jacksonDatabind
implementation libs.jacksonModuleScala
@@ -930,7 +929,8 @@ project(':core') {
testImplementation(libs.jfreechart) {
exclude group: 'junit', module: 'junit'
}
-
+ testImplementation libs.caffeine
+
generator project(':generator')
}
@@ -1703,6 +1703,7 @@ project(':storage') {
implementation project(':storage:api')
implementation project(':server-common')
implementation project(':clients')
+ implementation libs.caffeine
implementation libs.slf4jApi
implementation libs.jacksonDatabind
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index ee8355fef12..67b8ea08d84 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -269,6 +269,7 @@
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.common" />
+ <allow pkg="com.github.benmanes.caffeine.cache" />
</subpackage>
<subpackage name="shell">
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6e2a8c0ca4e..3f2df5a32b6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -335,7 +335,7 @@
<suppress checks="CyclomaticComplexity"
files="(LogValidator|RemoteLogManagerConfig).java"/>
<suppress checks="NPathComplexity"
- files="LogValidator.java"/>
+ files="(LogValidator|RemoteIndexCache).java"/>
<suppress checks="ParameterNumber"
files="(LogAppendInfo|RemoteLogManagerConfig).java"/>
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 7e528a6bd39..6831230287b 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -58,11 +58,13 @@ import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.OffsetPosition;
+import org.apache.kafka.storage.internals.log.RemoteIndexCache;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.storage.internals.log.TxnIndexSearchResult;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
@@ -160,7 +162,7 @@ public class RemoteLogManager implements Closeable {
String logDir,
String clusterId,
Time time,
- Function<TopicPartition, Optional<UnifiedLog>> fetchLog) {
+ Function<TopicPartition, Optional<UnifiedLog>> fetchLog) throws IOException {
this.rlmConfig = rlmConfig;
this.brokerId = brokerId;
this.logDir = logDir;
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala
index 34442e3c3f5..b90178d5c5d 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -593,7 +593,7 @@ class LocalLog(@volatile private var _dir: File,
object LocalLog extends Logging {
/** a file that is scheduled to be deleted */
- private[log] val DeletedFileSuffix = ".deleted"
+ private[log] val DeletedFileSuffix = LogFileUtils.DELETED_FILE_SUFFIX
/** A temporary file that is being used for log cleaning */
private[log] val CleanedFileSuffix = ".cleaned"
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 5e20d916310..913661fd4f4 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -17,8 +17,6 @@
package kafka.log
-import kafka.log.remote.RemoteIndexCache
-
import java.io._
import java.nio.file.Files
import java.util.concurrent._
@@ -43,7 +41,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache}
import scala.annotation.nowarn
@@ -397,7 +395,7 @@ class LogManager(logDirs: Seq[File],
logDir.isDirectory &&
// Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem
// but not any topic-partition dir.
- !logDir.getName.equals(RemoteIndexCache.DirName) &&
+ !logDir.getName.equals(RemoteIndexCache.DIR_NAME) &&
UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
numTotalLogs += logsToLoad.length
numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
deleted file mode 100644
index 7c035c1a336..00000000000
--- a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
+++ /dev/null
@@ -1,448 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.log.remote
-
-import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause}
-import kafka.log.UnifiedLog
-import kafka.log.remote.RemoteIndexCache.{DirName, offsetFromRemoteIndexFileName, RemoteLogIndexCacheCleanerThread, remoteLogSegmentIdFromRemoteIndexFileName, remoteOffsetIndexFile, remoteTimeIndexFile, remoteTransactionIndexFile}
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.utils.{CoreUtils, Logging, threadsafe}
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.CorruptRecordException
-import org.apache.kafka.common.utils.{Utils, Time}
-import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
-import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}
-import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex}
-import org.apache.kafka.server.util.ShutdownableThread
-
-import java.io.{File, InputStream}
-import java.nio.file.{FileAlreadyExistsException, Files, Path}
-import java.util.concurrent.LinkedBlockingQueue
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
-object RemoteIndexCache {
- val DirName = "remote-log-index-cache"
- val TmpFileSuffix = ".tmp"
- val RemoteLogIndexCacheCleanerThread = "remote-log-index-cleaner"
-
- def remoteLogSegmentIdFromRemoteIndexFileName(fileName: String): Uuid = {
- val underscoreIndex = fileName.indexOf("_")
- val dotIndex = fileName.indexOf(".")
- Uuid.fromString(fileName.substring(underscoreIndex + 1, dotIndex))
- }
-
- def offsetFromRemoteIndexFileName(fileName: String): Long = {
- fileName.substring(0, fileName.indexOf("_")).toLong
- }
-
- /**
- * Generates prefix for file name for the on-disk representation of remote indexes.
- *
- * Example of file name prefix is 45_dsdsd where 45 represents the base offset for the segment and
- * sdsdsd represents the unique [[RemoteLogSegmentId]]
- *
- * @param remoteLogSegmentMetadata remote segment for the remote indexes
- * @return string which should be used as prefix for on-disk representation of remote indexes
- */
- private def generateFileNamePrefixForIndex(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
- val startOffset = remoteLogSegmentMetadata.startOffset
- val segmentId = remoteLogSegmentMetadata.remoteLogSegmentId().id
- // uuid.toString uses URL encoding which is safe for filenames and URLs.
- s"${startOffset.toString}_${segmentId.toString}"
- }
-
- def remoteOffsetIndexFile(dir: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): File = {
- new File(dir, remoteOffsetIndexFileName(remoteLogSegmentMetadata))
- }
-
- def remoteOffsetIndexFileName(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
- val prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata)
- prefix + UnifiedLog.IndexFileSuffix
- }
-
- def remoteTimeIndexFile(dir: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): File = {
- new File(dir, remoteTimeIndexFileName(remoteLogSegmentMetadata))
- }
-
- def remoteTimeIndexFileName(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
- val prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata)
- prefix + UnifiedLog.TimeIndexFileSuffix
- }
-
- def remoteTransactionIndexFile(dir: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): File = {
- new File(dir, remoteTransactionIndexFileName(remoteLogSegmentMetadata))
- }
-
- def remoteTransactionIndexFileName(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): String = {
- val prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata)
- prefix + UnifiedLog.TxnIndexFileSuffix
- }
-}
-
-@threadsafe
-class Entry(val offsetIndex: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex) extends AutoCloseable {
- // visible for testing
- private[remote] var markedForCleanup = false
- // visible for testing
- private[remote] var cleanStarted = false
- // This lock is used to synchronize cleanup methods and read methods. This ensures that cleanup (which changes the
- // underlying files of the index) isn't performed while a read is in-progress for the entry. This is required in
- // addition to using the thread safe cache because, while the thread safety of the cache ensures that we can read
- // entries concurrently, it does not ensure that we won't mutate underlying files beloging to an entry.
- private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
-
- def lookupOffset(targetOffset: Long): OffsetPosition = {
- inReadLock(lock) {
- if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup")
- offsetIndex.lookup(targetOffset)
- }
- }
-
- def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = {
- inReadLock(lock) {
- if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup")
- val timestampOffset = timeIndex.lookup(timestamp)
- offsetIndex.lookup(math.max(startingOffset, timestampOffset.offset))
- }
- }
-
- private[remote] def markForCleanup(): Unit = {
- inWriteLock(lock) {
- if (!markedForCleanup) {
- markedForCleanup = true
- Array(offsetIndex, timeIndex).foreach(index =>
- index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, "", LogFileUtils.DELETED_FILE_SUFFIX))))
- // txn index needs to be renamed separately since it's not of type AbstractIndex
- txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file.getPath, "",
- LogFileUtils.DELETED_FILE_SUFFIX)))
- }
- }
- }
-
- /**
- * Deletes the index files from the disk. Invoking #close is not required prior to this function.
- */
- private[remote] def cleanup(): Unit = {
- inWriteLock(lock) {
- markForCleanup()
- // no-op if clean is done already
- if (!cleanStarted) {
- cleanStarted = true
- CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists()))
- }
- }
- }
-
- /**
- * Calls the underlying close method for each index which may lead to releasing resources such as mmap.
- * This function does not delete the index files.
- */
- @Override
- def close(): Unit = {
- inWriteLock(lock) {
- // close is no-op if entry is already marked for cleanup. Mmap resources are released during cleanup.
- if (!markedForCleanup) {
- Utils.closeQuietly(offsetIndex, "offset index")
- Utils.closeQuietly(timeIndex, "time index")
- Utils.closeQuietly(txnIndex, "transaction index")
- }
- }
- }
-
- override def toString: String = {
- s"RemoteIndexCacheEntry(" +
- s"timeIndex=${timeIndex.file.getName}, " +
- s"txnIndex=${txnIndex.file.getName}, " +
- s"offsetIndex=${offsetIndex.file.getName})"
- }
-}
-
-/**
- * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`.
- * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every
- * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available.
- *
- * The cache contains a garbage collection thread which will delete the files for entries that have been removed from
- * the cache.
- *
- * Note that closing this cache does not delete the index files on disk.
- * Note that the cache eviction policy is based on the default implementation of Caffeine i.e.
- * <a href="https://github.com/ben-manes/caffeine/wiki/Efficiency">Window TinyLfu</a>. TinyLfu relies on a frequency
- * sketch to probabilistically estimate the historic usage of an entry.
- *
- * @param maxSize maximum number of segment index entries to be cached.
- * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes.
- * @param logDir log directory
- */
-@threadsafe
-class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageManager, logDir: String)
- extends Logging with AutoCloseable {
- /**
- * Directory where the index files will be stored on disk.
- */
- private val cacheDir = new File(logDir, DirName)
- /**
- * Represents if the cache is closed or not. Closing the cache is an irreversible operation.
- */
- private val isRemoteIndexCacheClosed: AtomicBoolean = new AtomicBoolean(false)
- /**
- * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected.
- *
- * Visible for testing
- */
- private[remote] val expiredIndexes = new LinkedBlockingQueue[Entry]()
- /**
- * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other
- * concurrent reads in-progress.
- */
- private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
- /**
- * Actual cache implementation that this file wraps around.
- *
- * The requirements for this internal cache is as follows:
- * 1. Multiple threads should be able to read concurrently.
- * 2. Fetch for missing keys should not block read for available keys.
- * 3. Only one thread should fetch for a specific key.
- * 4. Should support LRU-like policy.
- *
- * We use [[Caffeine]] cache instead of implementing a thread safe LRU cache on our own.
- *
- * Visible for testing.
- */
- private[remote] var internalCache: Cache[Uuid, Entry] = Caffeine.newBuilder()
- .maximumSize(maxSize)
- // removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or
- // evicted (means removal due to the policy)
- .removalListener((key: Uuid, entry: Entry, _: RemovalCause) => {
- // Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread.
- entry.markForCleanup()
- if (!expiredIndexes.offer(entry)) {
- error(s"Error while inserting entry $entry for key $key into the cleaner queue because queue is full.")
- }
- })
- .build[Uuid, Entry]()
-
- private def init(): Unit = {
- val start = Time.SYSTEM.hiResClockMs()
- try {
- Files.createDirectory(cacheDir.toPath)
- info(s"Created new file $cacheDir for RemoteIndexCache")
- } catch {
- case _: FileAlreadyExistsException =>
- info(s"RemoteIndexCache directory $cacheDir already exists. Re-using the same directory.")
- case e: Exception =>
- error(s"Unable to create directory $cacheDir for RemoteIndexCache.", e)
- throw e
- }
-
- // Delete any .deleted files remained from the earlier run of the broker.
- Files.list(cacheDir.toPath).forEach((path: Path) => {
- if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) {
- if (Files.deleteIfExists(path))
- debug(s"Deleted file $path on cache initialization")
- }
- })
-
- Files.list(cacheDir.toPath).forEach((path:Path) => {
- val indexFileName = path.getFileName.toString
- val uuid = remoteLogSegmentIdFromRemoteIndexFileName(indexFileName)
- // It is safe to update the internalCache non-atomically here since this function is always called by a single
- // thread only.
- if (!internalCache.asMap().containsKey(uuid)) {
- val fileNameWithoutDotExtensions = indexFileName.substring(0, indexFileName.indexOf("."))
- val offsetIndexFile = new File(cacheDir, fileNameWithoutDotExtensions + UnifiedLog.IndexFileSuffix)
- val timestampIndexFile = new File(cacheDir, fileNameWithoutDotExtensions + UnifiedLog.TimeIndexFileSuffix)
- val txnIndexFile = new File(cacheDir, fileNameWithoutDotExtensions + UnifiedLog.TxnIndexFileSuffix)
-
- // Create entries for each path if all the index files exist.
- if (Files.exists(offsetIndexFile.toPath) &&
- Files.exists(timestampIndexFile.toPath) &&
- Files.exists(txnIndexFile.toPath)) {
-
- val offset = offsetFromRemoteIndexFileName(indexFileName)
- val offsetIndex = new OffsetIndex(offsetIndexFile, offset, Int.MaxValue, false)
- offsetIndex.sanityCheck()
-
- val timeIndex = new TimeIndex(timestampIndexFile, offset, Int.MaxValue, false)
- timeIndex.sanityCheck()
-
- val txnIndex = new TransactionIndex(offset, txnIndexFile)
- txnIndex.sanityCheck()
-
- internalCache.put(uuid, new Entry(offsetIndex, timeIndex, txnIndex))
- } else {
- // Delete all of them if any one of those indexes is not available for a specific segment id
- Files.deleteIfExists(offsetIndexFile.toPath)
- Files.deleteIfExists(timestampIndexFile.toPath)
- Files.deleteIfExists(txnIndexFile.toPath)
- }
- }
- })
- info(s"RemoteIndexCache starts up in ${Time.SYSTEM.hiResClockMs() - start} ms.")
- }
-
- init()
-
- // Start cleaner thread that will clean the expired entries
- private[remote] var cleanerThread: ShutdownableThread = new ShutdownableThread(RemoteLogIndexCacheCleanerThread) {
- setDaemon(true)
-
- override def doWork(): Unit = {
- var expiredEntryOpt: Option[Entry] = None
- try {
- expiredEntryOpt = Some(expiredIndexes.take())
- expiredEntryOpt.foreach( expiredEntry => {
- log.debug(s"Cleaning up index entry $expiredEntry")
- expiredEntry.cleanup()
- })
- } catch {
- case ie: InterruptedException =>
- // cleaner thread should only be interrupted when cache is being closed, else it's an error
- if (!isRemoteIndexCacheClosed.get()) {
- log.error("Cleaner thread received interruption but remote index cache is not closed", ie)
- // propagate the InterruptedException outside to correctly close the thread.
- throw ie
- } else {
- log.debug("Cleaner thread was interrupted on cache shutdown")
- }
- // do not exit for exceptions other than InterruptedException
- case ex: Throwable => log.error(s"Error occurred while cleaning up expired entry $expiredEntryOpt", ex)
- }
- }
- }
-
- cleanerThread.start()
-
- def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry = {
- if (isRemoteIndexCacheClosed.get()) {
- throw new IllegalStateException(s"Unable to fetch index for " +
- s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. Index instance is already closed.")
- }
-
- inReadLock(lock) {
- // while this thread was waiting for lock, another thread may have changed the value of isRemoteIndexCacheClosed.
- // check for index close again
- if (isRemoteIndexCacheClosed.get()) {
- throw new IllegalStateException(s"Unable to fetch index for " +
- s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. Index instance is already closed.")
- }
-
- val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id()
- internalCache.get(cacheKey, (_: Uuid) => {
- def loadIndexFile[T](indexFile: File,
- fetchRemoteIndex: RemoteLogSegmentMetadata => InputStream,
- readIndex: File => T): T = {
- def fetchAndCreateIndex(): T = {
- val tmpIndexFile = new File(cacheDir, indexFile.getName + RemoteIndexCache.TmpFileSuffix)
-
- val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata)
- try {
- Files.copy(inputStream, tmpIndexFile.toPath)
- } finally {
- if (inputStream != null) {
- inputStream.close()
- }
- }
-
- Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath, false)
- readIndex(indexFile)
- }
-
- if (Files.exists(indexFile.toPath)) {
- try {
- readIndex(indexFile)
- } catch {
- case ex: CorruptRecordException =>
- info(s"Error occurred while loading the stored index at ${indexFile.toPath}", ex)
- fetchAndCreateIndex()
- }
- } else {
- fetchAndCreateIndex()
- }
- }
-
- val startOffset = remoteLogSegmentMetadata.startOffset()
- val offsetIndexFile = remoteOffsetIndexFile(cacheDir, remoteLogSegmentMetadata)
- val offsetIndex: OffsetIndex = loadIndexFile(offsetIndexFile,
- rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET),
- file => {
- val index = new OffsetIndex(file, startOffset, Int.MaxValue, false)
- index.sanityCheck()
- index
- })
-
- val timeIndexFile = remoteTimeIndexFile(cacheDir, remoteLogSegmentMetadata)
- val timeIndex: TimeIndex = loadIndexFile(timeIndexFile,
- rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TIMESTAMP),
- file => {
- val index = new TimeIndex(file, startOffset, Int.MaxValue, false)
- index.sanityCheck()
- index
- })
-
- val txnIndexFile = remoteTransactionIndexFile(cacheDir, remoteLogSegmentMetadata)
- val txnIndex: TransactionIndex = loadIndexFile(txnIndexFile,
- rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION),
- file => {
- val index = new TransactionIndex(startOffset, file)
- index.sanityCheck()
- index
- })
-
- new Entry(offsetIndex, timeIndex, txnIndex)
- })
- }
- }
-
- def lookupOffset(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, offset: Long): Int = {
- inReadLock(lock) {
- getIndexEntry(remoteLogSegmentMetadata).lookupOffset(offset).position
- }
- }
-
- def lookupTimestamp(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Int = {
- inReadLock(lock) {
- getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position
- }
- }
-
- /**
- * Close should synchronously cleanup the resources used by this cache.
- * This index is closed when [[RemoteLogManager]] is closed.
- */
- def close(): Unit = {
- // make close idempotent and ensure no more reads allowed from henceforth. The in-progress reads will continue to
- // completion (release the read lock) and then close will begin executing. Cleaner thread will immediately stop work.
- if (!isRemoteIndexCacheClosed.getAndSet(true)) {
- inWriteLock(lock) {
- info(s"Close initiated for RemoteIndexCache. Cache stats=${internalCache.stats}. " +
- s"Cache entries pending delete=${expiredIndexes.size()}")
- // Initiate shutdown for cleaning thread
- val shutdownRequired = cleanerThread.initiateShutdown
- // Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk.
- internalCache.asMap().forEach((_, entry) => entry.close())
- // wait for cleaner thread to shutdown
- if (shutdownRequired) cleanerThread.awaitShutdown()
- // Note that internal cache does not require explicit cleaning / closing. We don't want to invalidate or cleanup
- // the cache as both would lead to triggering of removal listener.
- internalCache = null
- info(s"Close completed for RemoteIndexCache")
- }
- }
- }
-}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index dff07ce547d..e1c593fbca5 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -18,7 +18,6 @@
package kafka.log
import com.yammer.metrics.core.{Gauge, MetricName}
-import kafka.log.remote.RemoteIndexCache
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.server.BrokerTopicStats
@@ -40,7 +39,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Future}
import java.util.{Collections, Properties}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache}
import scala.collection.{Map, mutable}
import scala.collection.mutable.ArrayBuffer
@@ -376,7 +375,7 @@ class LogManagerTest {
@Test
def testLoadLogsSkipRemoteIndexCache(): Unit = {
val logDir = TestUtils.tempDir()
- val remoteIndexCache = new File(logDir, RemoteIndexCache.DirName)
+ val remoteIndexCache = new File(logDir, RemoteIndexCache.DIR_NAME)
remoteIndexCache.mkdir()
logManager = createLogManager(Seq(logDir))
logManager.loadLogs(logConfig, Map.empty)
diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index a39190f9ef3..b087676bed2 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -16,15 +16,14 @@
*/
package kafka.log.remote
-import kafka.log.UnifiedLog
-import kafka.log.remote.RemoteIndexCache.{RemoteLogIndexCacheCleanerThread, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, TimeIndex, TransactionIndex}
-import kafka.utils.TestUtils
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
+import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -64,7 +63,7 @@ class RemoteIndexCacheTest {
rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
- cache = new RemoteIndexCache(remoteStorageManager = rsm, logDir = tpDir.toString)
+ cache = new RemoteIndexCache(rsm, tpDir.toString)
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
@@ -72,12 +71,12 @@ class RemoteIndexCacheTest {
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
- val txIdx = createTxIndexForSegmentMetadata(metadata)
+ val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
- case IndexType.TRANSACTION => new FileInputStream(txIdx.file)
+ case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
@@ -94,7 +93,7 @@ class RemoteIndexCacheTest {
Utils.delete(logDir)
// Verify no lingering threads. It is important to have this as the very last statement in the @aftereach
// because this may throw an exception and prevent cleanup after it
- TestUtils.assertNoNonDaemonThreads(RemoteIndexCache.RemoteLogIndexCacheCleanerThread)
+ TestUtils.assertNoNonDaemonThreads(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD)
}
@Test
@@ -113,11 +112,11 @@ class RemoteIndexCacheTest {
assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
// assert that parent directory for the index files is correct
- assertEquals(RemoteIndexCache.DirName, offsetIndexFile.getParent.getFileName.toString,
+ assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString,
s"offsetIndex=$offsetIndexFile is created under incorrect parent")
- assertEquals(RemoteIndexCache.DirName, txnIndexFile.getParent.getFileName.toString,
+ assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString,
s"txnIndex=$txnIndexFile is created under incorrect parent")
- assertEquals(RemoteIndexCache.DirName, timeIndexFile.getParent.getFileName.toString,
+ assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString,
s"timeIndex=$timeIndexFile is created under incorrect parent")
}
@@ -156,7 +155,7 @@ class RemoteIndexCacheTest {
def testCacheEntryExpiry(): Unit = {
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
- cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = tpDir.toString)
+ cache = new RemoteIndexCache(2, rsm, tpDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
@@ -201,7 +200,7 @@ class RemoteIndexCacheTest {
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
- cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = tpDir.toString)
+ cache = new RemoteIndexCache(2, rsm, tpDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
@@ -242,9 +241,9 @@ class RemoteIndexCacheTest {
val cacheEntry = generateSpyCacheEntry()
// verify index files on disk
- assertTrue(getIndexFileFromDisk(UnifiedLog.IndexFileSuffix).isPresent, s"Offset index file should be present on disk at ${tpDir.toPath}")
- assertTrue(getIndexFileFromDisk(UnifiedLog.TxnIndexFileSuffix).isPresent, s"Txn index file should be present on disk at ${tpDir.toPath}")
- assertTrue(getIndexFileFromDisk(UnifiedLog.TimeIndexFileSuffix).isPresent, s"Time index file should be present on disk at ${tpDir.toPath}")
+ assertTrue(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${tpDir.toPath}")
+ assertTrue(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${tpDir.toPath}")
+ assertTrue(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${tpDir.toPath}")
// add the spied entry into the cache, it will overwrite the non-spied entry
cache.internalCache.put(internalIndexKey, cacheEntry)
@@ -256,9 +255,9 @@ class RemoteIndexCacheTest {
cache.internalCache.invalidate(internalIndexKey)
// wait until entry is marked for deletion
- TestUtils.waitUntilTrue(() => cacheEntry.markedForCleanup,
+ TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
"Failed to mark cache entry for cleanup after invalidation")
- TestUtils.waitUntilTrue(() => cacheEntry.cleanStarted,
+ TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
"Failed to cleanup cache entry after invalidation")
// first it will be marked for cleanup, second time markForCleanup is called when cleanup() is called
@@ -272,11 +271,11 @@ class RemoteIndexCacheTest {
verify(cacheEntry.txnIndex).renameTo(any(classOf[File]))
// verify no index files on disk
- assertFalse(getIndexFileFromDisk(UnifiedLog.IndexFileSuffix).isPresent,
+ assertFalse(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at ${tpDir.toPath}")
- assertFalse(getIndexFileFromDisk(UnifiedLog.TxnIndexFileSuffix).isPresent,
+ assertFalse(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${tpDir.toPath}")
- assertFalse(getIndexFileFromDisk(UnifiedLog.TimeIndexFileSuffix).isPresent,
+ assertFalse(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${tpDir.toPath}")
assertFalse(getIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at ${tpDir.toPath}")
@@ -287,7 +286,7 @@ class RemoteIndexCacheTest {
// cache is empty at beginning
assertTrue(cache.internalCache.asMap().isEmpty)
// verify that cleaner thread is running
- TestUtils.numThreadsRunning(RemoteLogIndexCacheCleanerThread, isDaemon = true)
+ TestUtils.numThreadsRunning(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, isDaemon = true)
// create a new entry
val spyEntry = generateSpyCacheEntry()
// an exception should not close the cleaner thread
@@ -297,11 +296,11 @@ class RemoteIndexCacheTest {
// trigger cleanup
cache.internalCache.invalidate(key)
// wait for cleanup to start
- TestUtils.waitUntilTrue(() => spyEntry.cleanStarted, "Failed while waiting for clean up to start")
+ TestUtils.waitUntilTrue(() => spyEntry.isCleanStarted, "Failed while waiting for clean up to start")
// Give the thread cleaner thread some time to throw an exception
Thread.sleep(100)
// Verify that Cleaner thread is still running even when exception is thrown in doWork()
- var threads = TestUtils.numThreadsRunning(RemoteLogIndexCacheCleanerThread, isDaemon = true)
+ var threads = TestUtils.numThreadsRunning(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, isDaemon = true)
assertEquals(1, threads.size,
s"Found unexpected ${threads.size} threads=${threads.map(t => t.getName).mkString(", ")}")
@@ -309,7 +308,7 @@ class RemoteIndexCacheTest {
cache.close()
// verify that the thread is closed properly
- threads = TestUtils.numThreadsRunning(RemoteLogIndexCacheCleanerThread, isDaemon = true)
+ threads = TestUtils.numThreadsRunning(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, isDaemon = true)
assertTrue(threads.isEmpty, s"Found unexpected ${threads.size} threads=${threads.map(t => t.getName).mkString(", ")}")
// if the thread is correctly being shutdown it will not be running
assertFalse(cache.cleanerThread.isRunning, "Unexpected thread state=running. Check error logs.")
@@ -403,7 +402,7 @@ class RemoteIndexCacheTest {
def testReloadCacheAfterClose(): Unit = {
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
- cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = tpDir.toString)
+ cache = new RemoteIndexCache(2, rsm, tpDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
@@ -437,21 +436,21 @@ class RemoteIndexCacheTest {
cache.close()
// Reload the cache from the disk and check the cache size is same as earlier
- val reloadedCache = new RemoteIndexCache(maxSize = 2, rsm, logDir = tpDir.toString)
+ val reloadedCache = new RemoteIndexCache(2, rsm, tpDir.toString)
assertEquals(2, reloadedCache.internalCache.asMap().size())
reloadedCache.close()
verifyNoMoreInteractions(rsm)
}
- private def generateSpyCacheEntry(): Entry = {
+ private def generateSpyCacheEntry(): RemoteIndexCache.Entry = {
val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))
val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata))
val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata))
- spy(new Entry(offsetIndex, timeIndex, txIndex))
+ spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex))
}
private def assertAtLeastOnePresent(cache: RemoteIndexCache, uuids: Uuid*): Unit = {
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
new file mode 100644
index 00000000000..d730eeee31e
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation of Caffeine i.e.
+ * <a href="https://github.com/ben-manes/caffeine/wiki/Efficiency">Window TinyLfu</a>. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ * This class is thread safe.
+ */
+public class RemoteIndexCache implements Closeable {
+
+ private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class);
+ private static final String TMP_FILE_SUFFIX = ".tmp";
+
+ public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner";
+ public static final String DIR_NAME = "remote-log-index-cache";
+
+ /**
+ * Directory where the index files will be stored on disk.
+ */
+ private final File cacheDir;
+
+ /**
+ * Represents if the cache is closed or not. Closing the cache is an irreversible operation.
+ */
+ private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false);
+
+ /**
+ * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected.
+ */
+ private final LinkedBlockingQueue<Entry> expiredIndexes = new LinkedBlockingQueue<>();
+
+ /**
+ * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other
+ * concurrent reads in-progress.
+ */
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ /**
+ * Actual cache implementation that this file wraps around.
+ *
+ * The requirements for this internal cache is as follows:
+ * 1. Multiple threads should be able to read concurrently.
+ * 2. Fetch for missing keys should not block read for available keys.
+ * 3. Only one thread should fetch for a specific key.
+ * 4. Should support LRU-like policy.
+ *
+ * We use {@link Caffeine} cache instead of implementing a thread safe LRU cache on our own.
+ */
+ private final Cache<Uuid, Entry> internalCache;
+ private final RemoteStorageManager remoteStorageManager;
+ private final ShutdownableThread cleanerThread;
+
+ public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
+ this(1024, remoteStorageManager, logDir);
+ }
+
+ /**
+ * Creates RemoteIndexCache with the given configs.
+ *
+ * @param maxSize maximum number of segment index entries to be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes.
+ * @param logDir log directory
+ */
+ public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, String logDir) throws IOException {
+ this.remoteStorageManager = remoteStorageManager;
+ cacheDir = new File(logDir, DIR_NAME);
+
+ internalCache = Caffeine.newBuilder()
+ .maximumSize(maxSize)
+ // removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or
+ // evicted (means removal due to the policy)
+ .removalListener((Uuid key, Entry entry, RemovalCause cause) -> {
+ // Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread.
+ if (entry != null) {
+ try {
+ entry.markForCleanup();
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ if (!expiredIndexes.offer(entry)) {
+ log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key);
+ }
+ } else {
+ log.error("Received entry as null for key {} when the it is removed from the cache.", key);
+ }
+ }).build();
+
+ init();
+
+ // Start cleaner thread that will clean the expired entries.
+ cleanerThread = createCleanerThread();
+ cleanerThread.start();
+ }
+
+ public Collection<Entry> expiredIndexes() {
+ return Collections.unmodifiableCollection(expiredIndexes);
+ }
+
+ // Visible for testing
+ public Cache<Uuid, Entry> internalCache() {
+ return internalCache;
+ }
+
+ // Visible for testing
+ public ShutdownableThread cleanerThread() {
+ return cleanerThread;
+ }
+
+ private ShutdownableThread createCleanerThread() {
+ ShutdownableThread thread = new ShutdownableThread("remote-log-index-cleaner") {
+ public void doWork() {
+ try {
+ Entry entry = expiredIndexes.take();
+ log.debug("Cleaning up index entry {}", entry);
+ entry.cleanup();
+ } catch (InterruptedException ie) {
+ // cleaner thread should only be interrupted when cache is being closed, else it's an error
+ if (!isRemoteIndexCacheClosed.get()) {
+ log.error("Cleaner thread received interruption but remote index cache is not closed", ie);
+ // propagate the InterruptedException outside to correctly close the thread.
+ throw new KafkaException(ie);
+ } else {
+ log.debug("Cleaner thread was interrupted on cache shutdown");
+ }
+ } catch (Exception ex) {
+ // do not exit for exceptions other than InterruptedException
+ log.error("Error occurred while cleaning up expired entry", ex);
+ }
+ }
+
+ };
+ thread.setDaemon(true);
+
+ return thread;
+ }
+
+ private void init() throws IOException {
+ long start = Time.SYSTEM.hiResClockMs();
+
+ try {
+ Files.createDirectory(cacheDir.toPath());
+ log.info("Created new file {} for RemoteIndexCache", cacheDir);
+ } catch (FileAlreadyExistsException e) {
+ log.info("RemoteIndexCache directory {} already exists. Re-using the same directory.", cacheDir);
+ } catch (Exception e) {
+ log.error("Unable to create directory {} for RemoteIndexCache.", cacheDir, e);
+ throw new KafkaException(e);
+ }
+
+ // Delete any .deleted or .tmp files remained from the earlier run of the broker.
+ try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
+ paths.forEach(path -> {
+ if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) ||
+ path.endsWith(TMP_FILE_SUFFIX)) {
+ try {
+ if (Files.deleteIfExists(path)) {
+ log.debug("Deleted file path {} on cache initialization", path);
+ }
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+ });
+ }
+
+ try (Stream<Path> paths = Files.list(cacheDir.toPath())) {
+ Iterator<Path> iterator = paths.iterator();
+ while (iterator.hasNext()) {
+ Path path = iterator.next();
+ Path fileNamePath = path.getFileName();
+ if (fileNamePath == null)
+ throw new KafkaException("Empty file name in remote index cache directory: " + cacheDir);
+
+ String indexFileName = fileNamePath.toString();
+ Uuid uuid = remoteLogSegmentIdFromRemoteIndexFileName(indexFileName);
+
+ // It is safe to update the internalCache non-atomically here since this function is always called by a single
+ // thread only.
+ if (!internalCache.asMap().containsKey(uuid)) {
+ String fileNameWithoutSuffix = indexFileName.substring(0, indexFileName.indexOf("."));
+ File offsetIndexFile = new File(cacheDir, fileNameWithoutSuffix + INDEX_FILE_SUFFIX);
+ File timestampIndexFile = new File(cacheDir, fileNameWithoutSuffix + TIME_INDEX_FILE_SUFFIX);
+ File txnIndexFile = new File(cacheDir, fileNameWithoutSuffix + TXN_INDEX_FILE_SUFFIX);
+
+ // Create entries for each path if all the index files exist.
+ if (Files.exists(offsetIndexFile.toPath()) &&
+ Files.exists(timestampIndexFile.toPath()) &&
+ Files.exists(txnIndexFile.toPath())) {
+ long offset = offsetFromRemoteIndexFileName(indexFileName);
+ OffsetIndex offsetIndex = new OffsetIndex(offsetIndexFile, offset, Integer.MAX_VALUE, false);
+ offsetIndex.sanityCheck();
+
+ TimeIndex timeIndex = new TimeIndex(timestampIndexFile, offset, Integer.MAX_VALUE, false);
+ timeIndex.sanityCheck();
+
+ TransactionIndex txnIndex = new TransactionIndex(offset, txnIndexFile);
+ txnIndex.sanityCheck();
+
+ Entry entry = new Entry(offsetIndex, timeIndex, txnIndex);
+ internalCache.put(uuid, entry);
+ } else {
+ // Delete all of them if any one of those indexes is not available for a specific segment id
+ tryAll(Arrays.asList(
+ () -> {
+ Files.deleteIfExists(offsetIndexFile.toPath());
+ return null;
+ },
+ () -> {
+ Files.deleteIfExists(timestampIndexFile.toPath());
+ return null;
+ },
+ () -> {
+ Files.deleteIfExists(txnIndexFile.toPath());
+ return null;
+ }));
+ }
+ }
+ }
+ }
+ log.info("RemoteIndexCache starts up in {} ms.", Time.SYSTEM.hiResClockMs() - start);
+ }
+
+ private <T> T loadIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ Function<RemoteLogSegmentMetadata, InputStream> fetchRemoteIndex,
+ Function<File, T> readIndex) throws IOException {
+ File indexFile = new File(cacheDir, file.getName());
+ T index = null;
+ if (Files.exists(indexFile.toPath())) {
+ try {
+ index = readIndex.apply(indexFile);
+ } catch (CorruptRecordException ex) {
+ log.info("Error occurred while loading the stored index file {}", indexFile.getPath(), ex);
+ }
+ }
+
+ if (index == null) {
+ File tmpIndexFile = new File(indexFile.getParentFile(), indexFile.getName() + RemoteIndexCache.TMP_FILE_SUFFIX);
+
+ try (InputStream inputStream = fetchRemoteIndex.apply(remoteLogSegmentMetadata);) {
+ Files.copy(inputStream, tmpIndexFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false);
+ index = readIndex.apply(indexFile);
+ }
+
+ return index;
+ }
+
+ public Entry getIndexEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+ if (isRemoteIndexCacheClosed.get()) throw new IllegalStateException("Unable to fetch index for " +
+ "segment id=" + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Instance is already closed.");
+
+ lock.readLock().lock();
+ try {
+ // while this thread was waiting for lock, another thread may have changed the value of isRemoteIndexCacheClosed.
+ // check for index close again
+ if (isRemoteIndexCacheClosed.get()) {
+ throw new IllegalStateException("Unable to fetch index for segment id="
+ + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Index instance is already closed.");
+ }
+
+ return internalCache.get(remoteLogSegmentMetadata.remoteLogSegmentId().id(),
+ (Uuid uuid) -> createCacheEntry(remoteLogSegmentMetadata));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+ long startOffset = remoteLogSegmentMetadata.startOffset();
+
+ try {
+ File offsetIndexFile = remoteOffsetIndexFile(cacheDir, remoteLogSegmentMetadata);
+ OffsetIndex offsetIndex = loadIndexFile(offsetIndexFile, remoteLogSegmentMetadata, rlsMetadata -> {
+ try {
+ return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET);
+ } catch (RemoteStorageException e) {
+ throw new KafkaException(e);
+ }
+ }, file -> {
+ try {
+ OffsetIndex index = new OffsetIndex(file, startOffset, Integer.MAX_VALUE, false);
+ index.sanityCheck();
+ return index;
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ });
+ File timeIndexFile = remoteTimeIndexFile(cacheDir, remoteLogSegmentMetadata);
+ TimeIndex timeIndex = loadIndexFile(timeIndexFile, remoteLogSegmentMetadata, rlsMetadata -> {
+ try {
+ return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TIMESTAMP);
+ } catch (RemoteStorageException e) {
+ throw new KafkaException(e);
+ }
+ }, file -> {
+ try {
+ TimeIndex index = new TimeIndex(file, startOffset, Integer.MAX_VALUE, false);
+ index.sanityCheck();
+ return index;
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ });
+ File txnIndexFile = remoteTransactionIndexFile(cacheDir, remoteLogSegmentMetadata);
+ TransactionIndex txnIndex = loadIndexFile(txnIndexFile, remoteLogSegmentMetadata, rlsMetadata -> {
+ try {
+ return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION);
+ } catch (RemoteStorageException e) {
+ throw new KafkaException(e);
+ }
+ }, file -> {
+ try {
+ TransactionIndex index = new TransactionIndex(startOffset, file);
+ index.sanityCheck();
+ return index;
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ });
+
+ return new Entry(offsetIndex, timeIndex, txnIndex);
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ public int lookupOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
+ lock.readLock().lock();
+ try {
+ return getIndexEntry(remoteLogSegmentMetadata).lookupOffset(offset).position;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public int lookupTimestamp(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long timestamp, long startingOffset) throws IOException {
+ lock.readLock().lock();
+ try {
+ return getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ // Make close idempotent and ensure no more reads allowed from henceforth. The in-progress reads will continue to
+ // completion (release the read lock) and then close will begin executing. Cleaner thread will immediately stop work.
+ if (!isRemoteIndexCacheClosed.getAndSet(true)) {
+ lock.writeLock().lock();
+ try {
+ log.info("Close initiated for RemoteIndexCache. Cache stats={}. Cache entries pending delete={}",
+ internalCache.stats(), expiredIndexes.size());
+ // Initiate shutdown for cleaning thread
+ boolean shutdownRequired = cleanerThread.initiateShutdown();
+ // Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk.
+ internalCache.asMap().forEach((uuid, entry) -> entry.close());
+ // wait for cleaner thread to shutdown
+ if (shutdownRequired) cleanerThread.awaitShutdown();
+
+ // Note that internal cache does not require explicit cleaning/closing. We don't want to invalidate or cleanup
+ // the cache as both would lead to triggering of removal listener.
+
+ log.info("Close completed for RemoteIndexCache");
+ } catch (InterruptedException e) {
+ throw new KafkaException(e);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ }
+
+ public static class Entry implements AutoCloseable {
+
+ private final OffsetIndex offsetIndex;
+ private final TimeIndex timeIndex;
+ private final TransactionIndex txnIndex;
+
+ // This lock is used to synchronize cleanup methods and read methods. This ensures that cleanup (which changes the
+ // underlying files of the index) isn't performed while a read is in-progress for the entry. This is required in
+ // addition to using the thread safe cache because, while the thread safety of the cache ensures that we can read
+ // entries concurrently, it does not ensure that we won't mutate underlying files belonging to an entry.
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private boolean cleanStarted = false;
+
+ private boolean markedForCleanup = false;
+
+ public Entry(OffsetIndex offsetIndex, TimeIndex timeIndex, TransactionIndex txnIndex) {
+ this.offsetIndex = offsetIndex;
+ this.timeIndex = timeIndex;
+ this.txnIndex = txnIndex;
+ }
+
+ // Visible for testing
+ public OffsetIndex offsetIndex() {
+ return offsetIndex;
+ }
+
+ // Visible for testing
+ public TimeIndex timeIndex() {
+ return timeIndex;
+ }
+
+ // Visible for testing
+ public TransactionIndex txnIndex() {
+ return txnIndex;
+ }
+
+ // Visible for testing
+ public boolean isCleanStarted() {
+ return cleanStarted;
+ }
+
+ // Visible for testing
+ public boolean isMarkedForCleanup() {
+ return markedForCleanup;
+ }
+
+ public OffsetPosition lookupOffset(long targetOffset) {
+ lock.readLock().lock();
+ try {
+ if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup");
+ else return offsetIndex.lookup(targetOffset);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public OffsetPosition lookupTimestamp(long timestamp, long startingOffset) throws IOException {
+ lock.readLock().lock();
+ try {
+ if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup");
+
+ TimestampOffset timestampOffset = timeIndex.lookup(timestamp);
+ return offsetIndex.lookup(Math.max(startingOffset, timestampOffset.offset));
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void markForCleanup() throws IOException {
+ lock.writeLock().lock();
+ try {
+ if (!markedForCleanup) {
+ markedForCleanup = true;
+ offsetIndex.renameTo(new File(Utils.replaceSuffix(offsetIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
+ timeIndex.renameTo(new File(Utils.replaceSuffix(timeIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
+ txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void cleanup() throws IOException {
+ lock.writeLock().lock();
+ try {
+ markForCleanup();
+ // no-op if clean is done already
+ if (!cleanStarted) {
+ cleanStarted = true;
+
+ List<StorageAction<Void, Exception>> actions = Arrays.asList(() -> {
+ offsetIndex.deleteIfExists();
+ return null;
+ }, () -> {
+ timeIndex.deleteIfExists();
+ return null;
+ }, () -> {
+ txnIndex.deleteIfExists();
+ return null;
+ });
+
+ tryAll(actions);
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ lock.writeLock().lock();
+ try {
+ Utils.closeQuietly(offsetIndex, "OffsetIndex");
+ Utils.closeQuietly(timeIndex, "TimeIndex");
+ Utils.closeQuietly(txnIndex, "TransactionIndex");
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Entry{" +
+ "offsetIndex=" + offsetIndex.file().getName() +
+ ", timeIndex=" + timeIndex.file().getName() +
+ ", txnIndex=" + txnIndex.file().getName() +
+ '}';
+ }
+ }
+
+ /**
+ * Executes each entry in `actions` list even if one or more throws an exception. If any of them throws
+ * an IOException, it will be rethrown and adds all other encountered exceptions as suppressed to that IOException.
+ * Otherwise, it throws KafkaException wrapped with the first exception and the remaining exceptions are added as
+ * suppressed to the KafkaException.
+ *
+ * @param actions actions to be executes
+ * @throws IOException Any IOException encountered while executing those actions.
+ * @throws KafkaException Any other non IOExceptions are wrapped and thrown as KafkaException
+ */
+ private static void tryAll(List<StorageAction<Void, Exception>> actions) throws IOException {
+ IOException ioException = null;
+ List<Exception> exceptions = Collections.emptyList();
+ for (StorageAction<Void, Exception> action : actions) {
+ try {
+ action.execute();
+ } catch (IOException e) {
+ if (ioException == null) {
+ ioException = e;
+ } else {
+ if (exceptions.isEmpty()) {
+ exceptions = new ArrayList<>();
+ }
+ exceptions.add(e);
+ }
+ } catch (Exception e) {
+ if (exceptions.isEmpty()) {
+ exceptions = new ArrayList<>();
+ }
+ exceptions.add(e);
+ }
+ }
+
+ if (ioException != null) {
+ for (Exception exception : exceptions) {
+ ioException.addSuppressed(exception);
+ }
+ throw ioException;
+ } else if (!exceptions.isEmpty()) {
+ Iterator<Exception> iterator = exceptions.iterator();
+ KafkaException kafkaException = new KafkaException(iterator.next());
+ while (iterator.hasNext()) {
+ kafkaException.addSuppressed(iterator.next());
+ }
+
+ throw kafkaException;
+ }
+ }
+
+ private static Uuid remoteLogSegmentIdFromRemoteIndexFileName(String fileName) {
+ int underscoreIndex = fileName.indexOf("_");
+ int dotIndex = fileName.indexOf(".");
+ return Uuid.fromString(fileName.substring(underscoreIndex + 1, dotIndex));
+ }
+
+ private static long offsetFromRemoteIndexFileName(String fileName) {
+ return Long.parseLong(fileName.substring(0, fileName.indexOf("_")));
+ }
+
+ /**
+ * Generates prefix for file name for the on-disk representation of remote indexes.
+ * <p>
+ * Example of file name prefix is 45_fooid where 45 represents the base offset for the segment and
+ * fooid represents the unique {@link org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId}
+ *
+ * @param remoteLogSegmentMetadata remote segment for the remote indexes
+ * @return string which should be used as prefix for on-disk representation of remote indexes
+ */
+ private static String generateFileNamePrefixForIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+ long startOffset = remoteLogSegmentMetadata.startOffset();
+ Uuid segmentId = remoteLogSegmentMetadata.remoteLogSegmentId().id();
+ // uuid.toString uses URL encoding which is safe for filenames and URLs.
+ return startOffset + "_" + segmentId.toString();
+ }
+
+ public static File remoteOffsetIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+ return new File(dir, remoteOffsetIndexFileName(remoteLogSegmentMetadata));
+ }
+
+ public static String remoteOffsetIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+ String prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata);
+ return prefix + LogFileUtils.INDEX_FILE_SUFFIX;
+ }
+
+ public static File remoteTimeIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+ return new File(dir, remoteTimeIndexFileName(remoteLogSegmentMetadata));
+ }
+
+ public static String remoteTimeIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+ return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + TIME_INDEX_FILE_SUFFIX;
+ }
+
+ public static File remoteTransactionIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+ return new File(dir, remoteTransactionIndexFileName(remoteLogSegmentMetadata));
+ }
+
+ public static String remoteTransactionIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+ return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+ }
+
+}
\ No newline at end of file