You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/05/24 17:42:29 UTC

[kafka] branch trunk updated: MINOR: Add log identifier/prefix printing in Log layer static functions (#10742)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c92e62a  MINOR: Add log identifier/prefix printing in Log layer static functions (#10742)
c92e62a is described below

commit c92e62a67fc8efe51550492592039986994c81fb
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Mon May 24 10:39:46 2021 -0700

    MINOR: Add log identifier/prefix printing in Log layer static functions (#10742)
    
    When #10478 was merged, we accidentally lost the identifier/prefix string that we used to previously log to stderr from some of the functions in the Log class. In this PR, I have reinstated the identifier/prefix logging in these functions, so that the debuggability is restored.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Cong Ding <co...@ccding.com>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            | 57 ++++++++++++++--------
 core/src/main/scala/kafka/log/LogLoader.scala      | 41 +++++++++-------
 .../unit/kafka/cluster/PartitionLockTest.scala     |  2 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala     |  2 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  2 +-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  | 12 ++---
 .../unit/kafka/server/ReplicaManagerTest.scala     |  2 +-
 .../scala/unit/kafka/utils/SchedulerTest.scala     |  2 +-
 9 files changed, 71 insertions(+), 51 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index ef0d6ae..0a4851b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -557,7 +557,7 @@ class Log(@volatile private var _dir: File,
   }
 
   private def initializeLeaderEpochCache(): Unit = lock synchronized {
-    leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion)
+    leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
   }
 
   private def updateLogEndOffset(offset: Long): Unit = {
@@ -592,7 +592,7 @@ class Log(@volatile private var _dir: File,
                                    producerStateManager: ProducerStateManager): Unit = lock synchronized {
     checkIfMemoryMappedBufferClosed()
     Log.rebuildProducerState(producerStateManager, segments, logStartOffset, lastOffset, recordVersion, time,
-      reloadFromCleanShutdown = false)
+      reloadFromCleanShutdown = false, logIdent)
   }
 
   def activeProducers: Seq[DescribeProducersResponseData.ProducerState] = {
@@ -1888,14 +1888,14 @@ class Log(@volatile private var _dir: File,
 
   private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
     Log.deleteSegmentFiles(segments, asyncDelete, deleteProducerStateSnapshots, dir, topicPartition,
-      config, scheduler, logDirFailureChannel, producerStateManager)
+      config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
   }
 
   private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
     lock synchronized {
       checkIfMemoryMappedBufferClosed()
       Log.replaceSegments(segments, newSegments, oldSegments, isRecoveredSwapFile, dir, topicPartition,
-        config, scheduler, logDirFailureChannel, producerStateManager)
+        config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
     }
   }
 
@@ -1937,7 +1937,7 @@ class Log(@volatile private var _dir: File,
   }
 
   private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized {
-    Log.splitOverflowedSegment(segment, segments, dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager)
+    Log.splitOverflowedSegment(segment, segments, dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
   }
 
 }
@@ -2005,7 +2005,12 @@ object Log extends Logging {
     Files.createDirectories(dir.toPath)
     val topicPartition = Log.parseTopicPartitionName(dir)
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(
+      dir,
+      topicPartition,
+      logDirFailureChannel,
+      config.messageFormatVersion.recordVersion,
+      s"[Log partition=$topicPartition, dir=${dir.getParent}] ")
     val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
     val offsets = LogLoader.load(LoadLogParams(
       dir,
@@ -2226,12 +2231,14 @@ object Log extends Logging {
    * @param topicPartition The topic partition
    * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
    * @param recordVersion The record version
+   * @param logPrefix The logging prefix
    * @return The new LeaderEpochFileCache instance (if created), none otherwise
    */
   def maybeCreateLeaderEpochCache(dir: File,
                                   topicPartition: TopicPartition,
                                   logDirFailureChannel: LogDirFailureChannel,
-                                  recordVersion: RecordVersion): Option[LeaderEpochFileCache] = {
+                                  recordVersion: RecordVersion,
+                                  logPrefix: String): Option[LeaderEpochFileCache] = {
     val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)
 
     def newLeaderEpochFileCache(): LeaderEpochFileCache = {
@@ -2246,7 +2253,7 @@ object Log extends Logging {
         None
 
       if (currentCache.exists(_.nonEmpty))
-        warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")
+        warn(s"${logPrefix}Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")
 
       Files.deleteIfExists(leaderEpochFile.toPath)
       None
@@ -2293,6 +2300,7 @@ object Log extends Logging {
    * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
    * @param producerStateManager The ProducerStateManager instance (if any) containing state associated
    *                             with the existingSegments
+   * @param logPrefix The logging prefix
    */
     private[log] def replaceSegments(existingSegments: LogSegments,
                                      newSegments: Seq[LogSegment],
@@ -2303,7 +2311,8 @@ object Log extends Logging {
                                      config: LogConfig,
                                      scheduler: Scheduler,
                                      logDirFailureChannel: LogDirFailureChannel,
-                                     producerStateManager: ProducerStateManager): Unit = {
+                                     producerStateManager: ProducerStateManager,
+                                     logPrefix: String): Unit = {
       val sortedNewSegments = newSegments.sortBy(_.baseOffset)
       // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments
       // but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment()
@@ -2332,7 +2341,8 @@ object Log extends Logging {
           config,
           scheduler,
           logDirFailureChannel,
-          producerStateManager)
+          producerStateManager,
+          logPrefix)
       }
       // okay we are safe now, remove the swap suffix
       sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
@@ -2359,7 +2369,7 @@ object Log extends Logging {
    * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
    * @param producerStateManager The ProducerStateManager instance (if any) containing state associated
    *                             with the existingSegments
-   *
+   * @param logPrefix The logging prefix
    * @throws IOException if the file can't be renamed and still exists
    */
   private[log] def deleteSegmentFiles(segmentsToDelete: Iterable[LogSegment],
@@ -2370,11 +2380,12 @@ object Log extends Logging {
                                       config: LogConfig,
                                       scheduler: Scheduler,
                                       logDirFailureChannel: LogDirFailureChannel,
-                                      producerStateManager: ProducerStateManager): Unit = {
+                                      producerStateManager: ProducerStateManager,
+                                      logPrefix: String): Unit = {
     segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
 
     def deleteSegments(): Unit = {
-      info(s"Deleting segment files ${segmentsToDelete.mkString(",")}")
+      info(s"${logPrefix}Deleting segment files ${segmentsToDelete.mkString(",")}")
       val parentDir = dir.getParent
       maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") {
         segmentsToDelete.foreach { segment =>
@@ -2429,6 +2440,7 @@ object Log extends Logging {
    * @param time The time instance used for checking the clock
    * @param reloadFromCleanShutdown True if the producer state is being built after a clean shutdown,
    *                                false otherwise.
+   * @param logPrefix The logging prefix
    */
   private[log] def rebuildProducerState(producerStateManager: ProducerStateManager,
                                         segments: LogSegments,
@@ -2436,7 +2448,8 @@ object Log extends Logging {
                                         lastOffset: Long,
                                         recordVersion: RecordVersion,
                                         time: Time,
-                                        reloadFromCleanShutdown: Boolean): Unit = {
+                                        reloadFromCleanShutdown: Boolean,
+                                        logPrefix: String): Unit = {
     val allSegments = segments.values
     val offsetsToSnapshot =
       if (allSegments.nonEmpty) {
@@ -2445,7 +2458,7 @@ object Log extends Logging {
       } else {
         Seq(Some(lastOffset))
       }
-    info(s"Loading producer state till offset $lastOffset with message format version ${recordVersion.value}")
+    info(s"${logPrefix}Loading producer state till offset $lastOffset with message format version ${recordVersion.value}")
 
     // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
     // upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
@@ -2469,7 +2482,7 @@ object Log extends Logging {
         producerStateManager.takeSnapshot()
       }
     } else {
-      info(s"Reloading from producer snapshot and rebuilding producer state from offset $lastOffset")
+      info(s"${logPrefix}Reloading from producer snapshot and rebuilding producer state from offset $lastOffset")
       val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
       val producerStateLoadStart = time.milliseconds()
       producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())
@@ -2508,7 +2521,7 @@ object Log extends Logging {
       }
       producerStateManager.updateMapEndOffset(lastOffset)
       producerStateManager.takeSnapshot()
-      info(s"Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " +
+      info(s"${logPrefix}Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " +
         s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment recovery from offset $lastOffset")
     }
   }
@@ -2535,6 +2548,7 @@ object Log extends Logging {
    * @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
    * @param producerStateManager The ProducerStateManager instance (if any) containing state associated
    *                             with the existingSegments
+   * @param logPrefix The logging prefix
    * @return List of new segments that replace the input segment
    */
   private[log] def splitOverflowedSegment(segment: LogSegment,
@@ -2544,11 +2558,12 @@ object Log extends Logging {
                                           config: LogConfig,
                                           scheduler: Scheduler,
                                           logDirFailureChannel: LogDirFailureChannel,
-                                          producerStateManager: ProducerStateManager): List[LogSegment] = {
+                                          producerStateManager: ProducerStateManager,
+                                          logPrefix: String): List[LogSegment] = {
     require(Log.isLogFile(segment.log.file), s"Cannot split file ${segment.log.file.getAbsoluteFile}")
     require(segment.hasOverflow, "Split operation is only permitted for segments with overflow")
 
-    info(s"Splitting overflowed segment $segment")
+    info(s"${logPrefix}Splitting overflowed segment $segment")
 
     val newSegments = ListBuffer[LogSegment]()
     try {
@@ -2581,9 +2596,9 @@ object Log extends Logging {
           s" before: ${segment.log.sizeInBytes} after: $totalSizeOfNewSegments")
 
       // replace old segment with new ones
-      info(s"Replacing overflowed segment $segment with split segments $newSegments")
+      info(s"${logPrefix}Replacing overflowed segment $segment with split segments $newSegments")
       replaceSegments(existingSegments, newSegments.toList, List(segment), isRecoveredSwapFile = false,
-        dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager)
+        dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager, logPrefix)
       newSegments.toList
     } catch {
       case e: Exception =>
diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala
index 6b28ec5..bfadb78 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -67,7 +67,7 @@ case class LoadLogParams(dir: File,
                          maxProducerIdExpirationMs: Int,
                          leaderEpochCache: Option[LeaderEpochFileCache],
                          producerStateManager: ProducerStateManager) {
-  val logIdentifier: String = s"[LogLoader partition=$topicPartition, dir=${dir.getParent}]"
+  val logIdentifier: String = s"[LogLoader partition=$topicPartition, dir=${dir.getParent}] "
 }
 
 /**
@@ -152,7 +152,8 @@ object LogLoader extends Logging {
       nextOffset,
       params.config.messageFormatVersion.recordVersion,
       params.time,
-      reloadFromCleanShutdown = params.hadCleanShutdown)
+      reloadFromCleanShutdown = params.hadCleanShutdown,
+      params.logIdentifier)
 
     val activeSegment = params.segments.lastSegment.get
     LoadedLogOffsets(
@@ -172,7 +173,7 @@ object LogLoader extends Logging {
   private def removeTempFilesAndCollectSwapFiles(params: LoadLogParams): Set[File] = {
 
     def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
-      info(s"${params.logIdentifier} Deleting index files with suffix $suffix for baseFile $baseFile")
+      info(s"${params.logIdentifier}Deleting index files with suffix $suffix for baseFile $baseFile")
       val offset = offsetFromFile(baseFile)
       Files.deleteIfExists(Log.offsetIndexFile(params.dir, offset, suffix).toPath)
       Files.deleteIfExists(Log.timeIndexFile(params.dir, offset, suffix).toPath)
@@ -188,7 +189,7 @@ object LogLoader extends Logging {
         throw new IOException(s"Could not read file $file")
       val filename = file.getName
       if (filename.endsWith(DeletedFileSuffix)) {
-        debug(s"${params.logIdentifier} Deleting stray temporary file ${file.getAbsolutePath}")
+        debug(s"${params.logIdentifier}Deleting stray temporary file ${file.getAbsolutePath}")
         Files.deleteIfExists(file.toPath)
       } else if (filename.endsWith(CleanedFileSuffix)) {
         minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset)
@@ -198,7 +199,7 @@ object LogLoader extends Logging {
         // if a log, delete the index files, complete the swap operation later
         // if an index just delete the index files, they will be rebuilt
         val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
-        info(s"${params.logIdentifier} Found file ${file.getAbsolutePath} from interrupted swap operation.")
+        info(s"${params.logIdentifier}Found file ${file.getAbsolutePath} from interrupted swap operation.")
         if (Log.isIndexFile(baseFile)) {
           deleteIndicesIfExist(baseFile)
         } else if (Log.isLogFile(baseFile)) {
@@ -213,7 +214,7 @@ object LogLoader extends Logging {
     // for more details about the split operation.
     val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)
     invalidSwapFiles.foreach { file =>
-      debug(s"${params.logIdentifier} Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
+      debug(s"${params.logIdentifier}Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset")
       val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
       deleteIndicesIfExist(baseFile, SwapFileSuffix)
       Files.deleteIfExists(file.toPath)
@@ -221,7 +222,7 @@ object LogLoader extends Logging {
 
     // Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
     cleanFiles.foreach { file =>
-      debug(s"${params.logIdentifier} Deleting stray .clean file ${file.getAbsolutePath}")
+      debug(s"${params.logIdentifier}Deleting stray .clean file ${file.getAbsolutePath}")
       Files.deleteIfExists(file.toPath)
     }
 
@@ -245,7 +246,7 @@ object LogLoader extends Logging {
         return fn
       } catch {
         case e: LogSegmentOffsetOverflowException =>
-          info(s"${params.logIdentifier} Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
+          info(s"${params.logIdentifier}Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
           Log.splitOverflowedSegment(
             e.segment,
             params.segments,
@@ -254,7 +255,8 @@ object LogLoader extends Logging {
             params.config,
             params.scheduler,
             params.logDirFailureChannel,
-            params.producerStateManager)
+            params.producerStateManager,
+            params.logIdentifier)
       }
     }
     throw new IllegalStateException()
@@ -280,7 +282,7 @@ object LogLoader extends Logging {
         val offset = offsetFromFile(file)
         val logFile = Log.logFile(params.dir, offset)
         if (!logFile.exists) {
-          warn(s"${params.logIdentifier} Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
+          warn(s"${params.logIdentifier}Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
           Files.deleteIfExists(file.toPath)
         }
       } else if (isLogFile(file)) {
@@ -297,11 +299,11 @@ object LogLoader extends Logging {
         try segment.sanityCheck(timeIndexFileNewlyCreated)
         catch {
           case _: NoSuchFileException =>
-            error(s"${params.logIdentifier} Could not find offset index file corresponding to log file" +
+            error(s"${params.logIdentifier}Could not find offset index file corresponding to log file" +
               s" ${segment.log.file.getAbsolutePath}, recovering segment and rebuilding index files...")
             recoverSegment(segment, params)
           case e: CorruptIndexException =>
-            warn(s"${params.logIdentifier} Found a corrupted index file corresponding to log file" +
+            warn(s"${params.logIdentifier}Found a corrupted index file corresponding to log file" +
               s" ${segment.log.file.getAbsolutePath} due to ${e.getMessage}}, recovering segment and" +
               " rebuilding index files...")
             recoverSegment(segment, params)
@@ -330,7 +332,8 @@ object LogLoader extends Logging {
       segment.baseOffset,
       params.config.messageFormatVersion.recordVersion,
       params.time,
-      reloadFromCleanShutdown = false)
+      reloadFromCleanShutdown = false,
+      params.logIdentifier)
     val bytesTruncated = segment.recover(producerStateManager, params.leaderEpochCache)
     // once we have recovered the segment's data, take a snapshot to ensure that we won't
     // need to reload the same segment again while recovering another segment.
@@ -390,7 +393,8 @@ object LogLoader extends Logging {
         params.config,
         params.scheduler,
         params.logDirFailureChannel,
-        params.producerStateManager)
+        params.producerStateManager,
+        params.logIdentifier)
     }
   }
 
@@ -434,20 +438,20 @@ object LogLoader extends Logging {
 
       while (unflushed.hasNext && !truncated) {
         val segment = unflushed.next()
-        info(s"${params.logIdentifier} Recovering unflushed segment ${segment.baseOffset}")
+        info(s"${params.logIdentifier}Recovering unflushed segment ${segment.baseOffset}")
         val truncatedBytes =
           try {
             recoverSegment(segment, params)
           } catch {
             case _: InvalidOffsetException =>
               val startOffset = segment.baseOffset
-              warn(s"${params.logIdentifier} Found invalid offset during recovery. Deleting the" +
+              warn(s"${params.logIdentifier}Found invalid offset during recovery. Deleting the" +
                 s" corrupt segment and creating an empty one with starting offset $startOffset")
               segment.truncateTo(startOffset)
           }
         if (truncatedBytes > 0) {
           // we had an invalid message, delete all remaining log
-          warn(s"${params.logIdentifier} Corruption found in segment ${segment.baseOffset}," +
+          warn(s"${params.logIdentifier}Corruption found in segment ${segment.baseOffset}," +
             s" truncating to offset ${segment.readNextOffset}")
           removeAndDeleteSegmentsAsync(unflushed.toList, params)
           truncated = true
@@ -519,7 +523,8 @@ object LogLoader extends Logging {
         params.config,
         params.scheduler,
         params.logDirFailureChannel,
-        params.producerStateManager)
+        params.producerStateManager,
+        params.logIdentifier)
     }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 8bb54c4..0cbeec9 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -283,7 +283,7 @@ class PartitionLockTest extends Logging {
         val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None)
         val logDirFailureChannel = new LogDirFailureChannel(1)
         val segments = new LogSegments(log.topicPartition)
-        val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion)
+        val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion, "")
         val maxProducerIdExpirationMs = 60 * 60 * 1000
         val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxProducerIdExpirationMs)
         val offsets = LogLoader.load(LoadLogParams(
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 8e492ac..7030715 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -236,7 +236,7 @@ class PartitionTest extends AbstractPartitionTest {
         val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints, None)
         val logDirFailureChannel = new LogDirFailureChannel(1)
         val segments = new LogSegments(log.topicPartition)
-        val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion)
+        val leaderEpochCache = Log.maybeCreateLeaderEpochCache(log.dir, log.topicPartition, logDirFailureChannel, log.config.messageFormatVersion.recordVersion, "")
         val maxProducerIdExpirationMs = 60 * 60 * 1000
         val producerStateManager = new ProducerStateManager(log.topicPartition, log.dir, maxProducerIdExpirationMs)
         val offsets = LogLoader.load(LoadLogParams(
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 3cb32c0..fbdbe20 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -100,7 +100,7 @@ class LogCleanerManagerTest extends Logging {
     val config = createLowRetentionLogConfig(logSegmentSize, LogConfig.Compact)
     val maxProducerIdExpirationMs = 60 * 60 * 1000
     val segments = new LogSegments(tp)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(tpDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, tpDir, maxProducerIdExpirationMs)
     val offsets = LogLoader.load(LoadLogParams(
       tpDir,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 99ff1aa..c137eab 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -105,7 +105,7 @@ class LogCleanerTest {
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val maxProducerIdExpirationMs = 60 * 60 * 1000
     val logSegments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
     val offsets = LogLoader.load(LoadLogParams(
       dir,
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 0546db4..d59ed1d 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -94,7 +94,7 @@ class LogLoaderTest {
           val logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(1)
           val maxProducerIdExpirationMs = 60 * 60 * 1000
           val segments = new LogSegments(topicPartition)
-          val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+          val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
           val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
           val loadLogParams = LoadLogParams(logDir, topicPartition, config, time.scheduler, time,
             logDirFailureChannel, hadCleanShutdown, segments, logStartOffset, logRecoveryPoint,
@@ -264,7 +264,7 @@ class LogLoaderTest {
           super.add(wrapper)
         }
       }
-      val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion)
+      val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
       val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
       val loadLogParams = LoadLogParams(
         logDir,
@@ -337,7 +337,7 @@ class LogLoaderTest {
     val config = LogConfig(new Properties())
     val maxProducerIdExpirationMs = 300000
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
       topicPartition,
@@ -471,7 +471,7 @@ class LogLoaderTest {
     val maxProducerIdExpirationMs = 300000
     val logDirFailureChannel = null
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
       topicPartition,
@@ -532,7 +532,7 @@ class LogLoaderTest {
     val maxProducerIdExpirationMs = 300000
     val logDirFailureChannel = null
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
       topicPartition,
@@ -595,7 +595,7 @@ class LogLoaderTest {
     val maxProducerIdExpirationMs = 300000
     val logDirFailureChannel = null
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
+    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, "")
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
       topicPartition,
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 8081d0b..d1d3860 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1486,7 +1486,7 @@ class ReplicaManagerTest {
     val tp = new TopicPartition(topic, topicPartition)
     val maxProducerIdExpirationMs = 30000
     val segments = new LogSegments(tp)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.messageFormatVersion.recordVersion)
+    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, tp, mockLogDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
     val producerStateManager = new ProducerStateManager(tp, logDir, maxProducerIdExpirationMs)
     val offsets = LogLoader.load(LoadLogParams(
       logDir,
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 876388d..93362e2 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -123,7 +123,7 @@ class SchedulerTest {
     val topicPartition = Log.parseTopicPartitionName(logDir)
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val segments = new LogSegments(topicPartition)
-    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion)
+    val leaderEpochCache = Log.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.messageFormatVersion.recordVersion, "")
     val producerStateManager = new ProducerStateManager(topicPartition, logDir, maxProducerIdExpirationMs)
     val offsets = LogLoader.load(LoadLogParams(
       logDir,