You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/05/24 17:42:29 UTC
[kafka] branch trunk updated: MINOR: Add log identifier/prefix
printing in Log layer static functions (#10742)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c92e62a MINOR: Add log identifier/prefix printing in Log layer static functions (#10742)
c92e62a is described below
commit c92e62a67fc8efe51550492592039986994c81fb
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Mon May 24 10:39:46 2021 -0700
MINOR: Add log identifier/prefix printing in Log layer static functions (#10742)
When #10478 was merged, we accidentally lost the identifier/prefix string that we used to previously log to stderr from some of the functions in the Log class. In this PR, I have reinstated the identifier/prefix logging in these functions, so that the debuggability is restored.
Reviewers: Luke Chen <sh...@gmail.com>, Cong Ding <co...@ccding.com>, Jun Rao <ju...@gmail.com>
---
core/src/main/scala/kafka/log/Log.scala | 57 ++++++++++++++--------
core/src/main/scala/kafka/log/LogLoader.scala | 41 +++++++++-------
.../unit/kafka/cluster/PartitionLockTest.scala | 2 +-
.../scala/unit/kafka/cluster/PartitionTest.scala | 2 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 12 ++---
.../unit/kafka/server/ReplicaManagerTest.scala | 2 +-
.../scala/unit/kafka/utils/SchedulerTest.scala | 2 +-
9 files changed, 71 insertions(+), 51 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index ef0d6ae..0a4851b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -557,7 +557,7 @@ class Log(@volatile private var _dir: File,
}
private def initializeLeaderEpochCache(): Unit = lock synchronized {
- leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion)
+ leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
}
private def updateLogEndOffset(offset: Long): Unit = {
@@ -592,7 +592,7 @@ class Log(@volatile private var _dir: File,
producerStateManager: ProducerStateManager): Unit = lock synchronized {
checkIfMemoryMappedBufferClosed()
Log.rebuildProducerState(producerStateManager, segments, logStartOffset, lastOffset, recordVersion, time,
- reloadFromCleanShutdown = false)
+ reloadFromCleanShutdown = false, logIdent)
}
def activeProducers: Seq[DescribeProducersResponseData.ProducerState] = {
@@ -1888,14 +1888,14 @@ class Log(@volatile private var _dir: File,
private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
Log.deleteSegmentFiles(segments, asyncDelete, deleteProducerStateSnapshots, dir, topicPartition,
- config, scheduler, logDirFailureChannel, producerStateManager)
+ config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
}
private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
lock synchronized {
checkIfMemoryMappedBufferClosed()
Log.replaceSegments(segments, newSegments, oldSegments, isRecoveredSwapFile, dir, topicPartition,
- config, scheduler, logDirFailureChannel, producerStateManager)
+ config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
}
}
@@ -1937,7 +1937,7 @@ class Log(@volatile private var _dir: File,
}
private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized {
- Log.splitOverflowedSegment(segment, segments, dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager)
+ Log.splitOverflowedSegment(segment, segments, dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
}
}
@@ -2005,7 +2005,12 @@ object Log extends Logging {
Files.createDirectories(dir.toPath)
val topicPartition = Log.parseTopicPartitionName(dir)
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(
+ dir,
+ topicPartition,
+ logDirFailureChannel,
+ config.messageFormatVersion.recordVersion,
+ s"[Log partition=$topicPartition, dir=${dir.getParent}] ")
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
dir,
@@ -2226,12 +2231,14 @@ object Log extends Logging {
* @param topicPartition The topic partition
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param recordVersion The record version
+ * @param logPrefix The logging prefix
* @return The new LeaderEpochFileCache instance (if created), none otherwise
*/
def maybeCreateLeaderEpochCache(dir: File,
topicPartition: TopicPartition,
logDirFailureChannel: LogDirFailureChannel,
- recordVersion: RecordVersion): Option[LeaderEpochFileCache] = {
+ recordVersion: RecordVersion,
+ logPrefix: String): Option[LeaderEpochFileCache] = {
val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
def newLeaderEpochFileCache(): LeaderEpochFileCache = {
@@ -2246,7 +2253,7 @@ object Log extends Logging {
None
if (currentCache.exists(_.nonEmpty))
- warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")
+ warn(s"${logPrefix}Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")
Files.deleteIfExists(leaderEpochFile.toPath)
None
@@ -2293,6 +2300,7 @@ object Log extends Logging {
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param producerStateManager The ProducerStateManager instance (if any) containing state associated
* with the existingSegments
+ * @param logPrefix The logging prefix
*/
private[log] def replaceSegments(existingSegments: LogSegments,
newSegments: Seq[LogSegment],
@@ -2303,7 +2311,8 @@ object Log extends Logging {
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
- producerStateManager: ProducerStateManager): Unit = {
+ producerStateManager: ProducerStateManager,
+ logPrefix: String): Unit = {
val sortedNewSegments = newSegments.sortBy(_.baseOffset)
// Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments
// but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment()
@@ -2332,7 +2341,8 @@ object Log extends Logging {
config,
scheduler,
logDirFailureChannel,
- producerStateManager)
+ producerStateManager,
+ logPrefix)
}
// okay we are safe now, remove the swap suffix
sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
@@ -2359,7 +2369,7 @@ object Log extends Logging {
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param producerStateManager The ProducerStateManager instance (if any) containing state associated
* with the existingSegments
- *
+ * @param logPrefix The logging prefix
* @throws IOException if the file can't be renamed and still exists
*/
private[log] def deleteSegmentFiles(segmentsToDelete: Iterable[LogSegment],
@@ -2370,11 +2380,12 @@ object Log extends Logging {
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
- producerStateManager: ProducerStateManager): Unit = {
+ producerStateManager: ProducerStateManager,
+ logPrefix: String): Unit = {
segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
def deleteSegments(): Unit = {
- info(s"Deleting segment files ${segmentsToDelete.mkString(",")}")
+ info(s"${logPrefix}Deleting segment files ${segmentsToDelete.mkString(",")}")
val parentDir = dir.getParent
maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") {
segmentsToDelete.foreach { segment =>
@@ -2429,6 +2440,7 @@ object Log extends Logging {
* @param time The time instance used for checking the clock
* @param reloadFromCleanShutdown True if the producer state is being built after a clean shutdown,
* false otherwise.
+ * @param logPrefix The logging prefix
*/
private[log] def rebuildProducerState(producerStateManager: ProducerStateManager,
segments: LogSegments,
@@ -2436,7 +2448,8 @@ object Log extends Logging {
lastOffset: Long,
recordVersion: RecordVersion,
time: Time,
- reloadFromCleanShutdown: Boolean): Unit = {
+ reloadFromCleanShutdown: Boolean,
+ logPrefix: String): Unit = {
val allSegments = segments.values
val offsetsToSnapshot =
if (allSegments.nonEmpty) {
@@ -2445,7 +2458,7 @@ object Log extends Logging {
} else {
Seq(Some(lastOffset))
}
- info(s"Loading producer state till offset $lastOffset with message format version ${recordVersion.value}")
+ info(s"${logPrefix}Loading producer state till offset $lastOffset with message format version ${recordVersion.value}")
// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
// upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
@@ -2469,7 +2482,7 @@ object Log extends Logging {
producerStateManager.takeSnapshot()
}
} else {
- info(s"Reloading from producer snapshot and rebuilding producer state from offset $lastOffset")
+ info(s"${logPrefix}Reloading from producer snapshot and rebuilding producer state from offset $lastOffset")
val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
val producerStateLoadStart = time.milliseconds()
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())
@@ -2508,7 +2521,7 @@ object Log extends Logging {
}
producerStateManager.updateMapEndOffset(lastOffset)
producerStateManager.takeSnapshot()
- info(s"Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " +
+ info(s"${logPrefix}Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " +
s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment recovery from offset $lastOffset")
}
}
@@ -2535,6 +2548,7 @@ object Log extends Logging {
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param producerStateManager The ProducerStateManager instance (if any) containing state associated
* with the existingSegments
+ * @param logPrefix The logging prefix
* @return List of new segments that replace the input segment
*/
private[log] def splitOverflowedSegment(segment: LogSegment,
@@ -2544,11 +2558,12 @@ object Log extends Logging {
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
- producerStateManager: ProducerStateManager): List[LogSegment] = {
+ producerStateManager: ProducerStateManager,
+ logPrefix: String): List[LogSegment] = {
require(Log.isLogFile(segment.log.file), s"Cannot split file ${segment.log.file.getAbsoluteFile}")
require(segment.hasOverflow, "Split operation is only permitted for segments with overflow")
- info(s"Splitting overflowed segment $segment")
+ info(s"${logPrefix}Splitting overflowed segment $segment")
val newSegments = ListBuffer[LogSegment]()
try {
@@ -2581,9 +2596,9 @@ object Log extends Logging {
s" before: ${segment.log.sizeInBytes} after: $totalSizeOfNewSegments")
// replace old segment with new ones
- info(s"Replacing overflowed segment $segment with split segments $newSegments")
+ info(s"${logPrefix}Replacing overflowed segment $segment with split segments $newSegments")
replaceSegments(existingSegments, newSegments.toList, List(segment), isRecoveredSwapFile = false,
- dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager)
+ dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager, logPrefix)
newSegments.toList
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala
index 6b28ec5..bfadb78 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -67,7 +67,7 @@ case class LoadLogParams(dir: File,
maxProducerIdExpirationMs: Int,
leaderEpochCache: Option[LeaderEpochFileCache],
producerStateManager: ProducerStateManager) {
- val logIdentifier: String = s"[LogLoader partition=$topicPartition, dir=${dir.getParent}]"
+ val logIdentifier: String = s"[LogLoader partition=$topicPartition, dir=${dir.getParent}] "
}
/**
@@ -152,7 +152,8 @@ object LogLoader extends Logging {
nextOffset,
params.config.messageFormatVersion.recordVersion,
params.time,
- reloadFromCleanShutdown = params.hadCleanShutdown)
+ reloadFromCleanShutdown = params.hadCleanShutdown,
+ params.logIdentifier)
val activeSegment = params.segments.lastSegment.get
LoadedLogOffsets(
@@ -172,7 +173,7 @@ object LogLoader extends Logging {
private def removeTempFilesAndCollectSwapFiles(params: LoadLogParams): Set[File] = {
def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
- info(s"${params.logIdentifier} Deleting index files with suffix $suffix for baseFile $baseFile")
+ info(s"${params.logIdentifier}Deleting index files with suffix $suffix for baseFile $baseFile")
val offset = offsetFromFile(baseFile)
Files.deleteIfExists(Log.offsetIndexFile(params.dir, offset, suffix).toPath)
Files.deleteIfExists(Log.timeIndexFile(params.dir, offset, suffix).toPath)
@@ -188,7 +189,7 @@ object LogLoader extends Logging {
throw new IOException(s"Could not read file $file")
val filename = file.getName
if (filename.endsWith(DeletedFileSuffix)) {
- debug(s"${params.logIdentifier} Deleting stray temporary file ${file.getAbsolutePath}")
+ debug(s"${params.logIdentifier}Deleting stray temporary file ${file.getAbsolutePath}")
Files.deleteIfExists(file.toPath)
} else if (filename.endsWith(CleanedFileSuffix)) {
minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset)
@@ -198,7 +199,7 @@ object LogLoader extends Logging {
// if a log, delete the index files, complete the swap operation later
// if an index just delete the index files, they will be rebuilt
val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
- info(s"${params.logIdentifier} Found file ${file.getAbsolutePath} from interrupted swap operation.")
+ info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from interrupted swap operation.")
if (Log.isIndexFile(baseFile)) {
deleteIndicesIfExist(baseFile)
} else if (Log.isLogFile(baseFile)) {
@@ -213,7 +214,7 @@ object LogLoader extends Logging {
// for more details about the split operation.
val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
invalidSwapFiles.foreach { file =>
- debug(s"${params.logIdentifier} Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
+ debug(s"${params.logIdentifier}Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
deleteIndicesIfExist(baseFile, SwapFileSuffix)
Files.deleteIfExists(file.toPath)
@@ -221,7 +222,7 @@ object LogLoader extends Logging {
// Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
cleanFiles.foreach { file =>
- debug(s"${params.logIdentifier} Deleting stray .clean file ${file.getAbsolutePath}")
+ debug(s"${params.logIdentifier}Deleting stray .clean file ${file.getAbsolutePath}")
Files.deleteIfExists(file.toPath)
}
@@ -245,7 +246,7 @@ object LogLoader extends Logging {
return fn
} catch {
case e: LogSegmentOffsetOverflowException =>
- info(s"${params.logIdentifier} Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
+ info(s"${params.logIdentifier}Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
Log.splitOverflowedSegment(
e.segment,
params.segments,
@@ -254,7 +255,8 @@ object LogLoader extends Logging {
params.config,
params.scheduler,
params.logDirFailureChannel,
- params.producerStateManager)
+ params.producerStateManager,
+ params.logIdentifier)
}
}
throw new IllegalStateException()
@@ -280,7 +282,7 @@ object LogLoader extends Logging {
val offset = offsetFromFile(file)
val logFile = Log.logFile(params.dir, offset)
if (!logFile.exists) {
- warn(s"${params.logIdentifier} Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
+ warn(s"${params.logIdentifier}Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
Files.deleteIfExists(file.toPath)
}
} else if (isLogFile(file)) {
@@ -297,11 +299,11 @@ object LogLoader extends Logging {
try segment.sanityCheck(timeIndexFileNewlyCreated)
catch {
case _: NoSuchFileException =>
- error(s"${params.logIdentifier} Could not find offset index file corresponding to log file" +
+ error(s"${params.logIdentifier}Could not find offset index file corresponding to log file" +
s" ${segment.log.file.getAbsolutePath}, recovering segment and rebuilding index files...")
recoverSegment(segment, params)
case e: CorruptIndexException =>
- warn(s"${params.logIdentifier} Found a corrupted index file corresponding to log file" +
+ warn(s"${params.logIdentifier}Found a corrupted index file corresponding to log file" +
s" ${segment.log.file.getAbsolutePath} due to ${e.getMessage}}, recovering segment and" +
" rebuilding index files...")
recoverSegment(segment, params)
@@ -330,7 +332,8 @@ object LogLoader extends Logging {
segment.baseOffset,
params.config.messageFormatVersion.recordVersion,
params.time,
- reloadFromCleanShutdown = false)
+ reloadFromCleanShutdown = false,
+ params.logIdentifier)
val bytesTruncated = segment.recover(producerStateManager, params.leaderEpochCache)
// once we have recovered the segment's data, take a snapshot to ensure that we won't
// need to reload the same segment again while recovering another segment.
@@ -390,7 +393,8 @@ object LogLoader extends Logging {
params.config,
params.scheduler,
params.logDirFailureChannel,
- params.producerStateManager)
+ params.producerStateManager,
+ params.logIdentifier)
}
}
@@ -434,20 +438,20 @@ object LogLoader extends Logging {
while (unflushed.hasNext && !truncated) {
val segment = unflushed.next()
- info(s"${params.logIdentifier} Recovering unflushed segment ${segment.baseOffset}")
+ info(s"${params.logIdentifier}Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
try {
recoverSegment(segment, params)
} catch {
case _: InvalidOffsetException =>
val startOffset = segment.baseOffset
- warn(s"${params.logIdentifier} Found invalid offset during recovery. Deleting the" +
+ warn(s"${params.logIdentifier}Found invalid offset during recovery. Deleting the" +
s" corrupt segment and creating an empty one with starting offset $startOffset")
segment.truncateTo(startOffset)
}
if (truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
- warn(s"${params.logIdentifier} Corruption found in segment ${segment.baseOffset}," +
+ warn(s"${params.logIdentifier}Corruption found in segment ${segment.baseOffset}," +
s" truncating to offset ${segment.readNextOffset}")
removeAndDeleteSegmentsAsync(unflushed.toList, params)
truncated = true
@@ -519,7 +523,8 @@ object LogLoader extends Logging {
params.config,
params.scheduler,
params.logDirFailureChannel,
- params.producerStateManager)
+ params.producerStateManager,
+ params.logIdentifier)
}
}
}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 8bb54c4..0cbeec9 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -283,7 +283,7 @@ class PartitionLockTest extends Logging {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None)
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion, "")
val maxProducerIdExpirationMs = 60 * 60 * 1000
val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 8e492ac..7030715 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -236,7 +236,7 @@ class PartitionTest extends AbstractPartitionTest {
val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None)
val logDirFailureChannel = new LogDirFailureChannel(1)
val segments = new LogSegments(log.topicPartition)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion, "")
val maxProducerIdExpirationMs = 60 * 60 * 1000
val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 3cb32c0..fbdbe20 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -100,7 +100,7 @@ class LogCleanerManagerTest extends Logging {
val config = createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact)
val maxProducerIdExpirationMs = 60 * 60 * 1000
val segments = new LogSegments(tp)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
tpDir,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 99ff1aa..c137eab 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -105,7 +105,7 @@ class LogCleanerTest {
val logDirFailureChannel = new LogDirFailureChannel(10)
val maxProducerIdExpirationMs = 60 * 60 * 1000
val logSegments = new LogSegments(topicPartition)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
dir,
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 0546db4..d59ed1d 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -94,7 +94,7 @@ class LogLoaderTest {
val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
val maxProducerIdExpirationMs = 60 * 60 * 1000
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
val loadLogParams = LoadLogParams(logDir, topicPartition, config, time.scheduler, time,
logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint,
@@ -264,7 +264,7 @@ class LogLoaderTest {
super.add(wrapper)
}
}
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
val loadLogParams = LoadLogParams(
logDir,
@@ -337,7 +337,7 @@ class LogLoaderTest {
val config = LogConfig(new Properties())
val maxProducerIdExpirationMs = 300000
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val offsets = LogLoader.load(LoadLogParams(
logDir,
topicPartition,
@@ -471,7 +471,7 @@ class LogLoaderTest {
val maxProducerIdExpirationMs = 300000
val logDirFailureChannel = null
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val offsets = LogLoader.load(LoadLogParams(
logDir,
topicPartition,
@@ -532,7 +532,7 @@ class LogLoaderTest {
val maxProducerIdExpirationMs = 300000
val logDirFailureChannel = null
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val offsets = LogLoader.load(LoadLogParams(
logDir,
topicPartition,
@@ -595,7 +595,7 @@ class LogLoaderTest {
val maxProducerIdExpirationMs = 300000
val logDirFailureChannel = null
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
val offsets = LogLoader.load(LoadLogParams(
logDir,
topicPartition,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 8081d0b..d1d3860 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1486,7 +1486,7 @@ class ReplicaManagerTest {
val tp = new TopicPartition(topic, topicPartition)
val maxProducerIdExpirationMs = 30000
val segments = new LogSegments(tp)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(tp, logDir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
logDir,
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 876388d..93362e2 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -123,7 +123,7 @@ class SchedulerTest {
val topicPartition = Log.parseTopicPartitionName(logDir)
val logDirFailureChannel = new LogDirFailureChannel(10)
val segments = new LogSegments(topicPartition)
- val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion)
+ val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
logDir,