You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/03/26 03:54:09 UTC

[kafka] branch trunk updated: MINOR: Improve performance of checkpointHighWatermarks, patch 1/2 (#6741)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8cf781e  MINOR: Improve performance of checkpointHighWatermarks, patch 1/2 (#6741)
8cf781e is described below

commit 8cf781ef0196995ef99d875612b6b7e9adcb912b
Author: Gardner Vickers <ga...@vickers.me>
AuthorDate: Wed Mar 25 20:53:42 2020 -0700

    MINOR: Improve performance of checkpointHighWatermarks, patch 1/2 (#6741)
    
    This PR works to improve high watermark checkpointing performance.
    
    `ReplicaManager.checkpointHighWatermarks()` was found to be a major contributor to GC pressure, especially on Kafka clusters with high partition counts and low throughput.
    
    Added a JMH benchmark for `checkpointHighWatermarks` which establishes a
    performance baseline. The parameterized benchmark was run with 100, 1000 and
    2000 topics.
    
    Modified `ReplicaManager.checkpointHighWatermarks()` to avoid extra copies and cached
    the Log parent directory Sting to avoid frequent allocations when calculating
    `File.getParent()`.
    
    A few clean-ups:
    * Changed all usages of Log.dir.getParent to Log.parentDir and Log.dir.getParentFile to
    Log.parentDirFile.
    * Only expose public accessor for `Log.dir` (consistent with `Log.parentDir`)
    * Removed unused parameters in `Partition.makeLeader`, `Partition.makeFollower` and `Partition.createLogIfNotExists`.
    
    Benchmark results:
    
    | Topic Count | Ops/ms | MB/sec allocated |
    |-------------|---------|------------------|
    | 100               | + 51%    |  - 91% |
    | 1000             | + 143% |  - 49% |
    | 2000            | + 149% |   - 50% |
    
    Reviewers: Lucas Bradstreet <lu...@confluent.io>. Ismael Juma <is...@juma.me.uk>
    
    Co-authored-by: Gardner Vickers <ga...@vickers.me>
    Co-authored-by: Ismael Juma <is...@juma.me.uk>
---
 .gitignore                                         |   1 +
 build.gradle                                       |   6 +-
 checkstyle/import-control-jmh-benchmarks.xml       |   1 +
 checkstyle/import-control.xml                      |  11 --
 core/src/main/scala/kafka/cluster/Partition.scala  |  38 ++---
 core/src/main/scala/kafka/log/Log.scala            |  63 +++++---
 core/src/main/scala/kafka/log/LogCleaner.scala     |   6 +-
 .../main/scala/kafka/log/LogCleanerManager.scala   |   6 +-
 core/src/main/scala/kafka/log/LogManager.scala     |  38 ++---
 .../main/scala/kafka/server/ReplicaManager.scala   |  47 +++---
 .../unit/kafka/cluster/AssignmentStateTest.scala   |   2 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |  12 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 110 +++++--------
 .../unit/kafka/server/ReplicaManagerTest.scala     |  45 +++---
 gradle/spotbugs-exclude.xml                        |   3 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   2 +-
 .../partition/PartitionMakeFollowerBenchmark.java  |   4 +-
 .../UpdateFollowerFetchStateBenchmark.java         |   2 +-
 .../jmh/server/HighwatermarkCheckpointBench.java   | 174 +++++++++++++++++++++
 19 files changed, 360 insertions(+), 211 deletions(-)

diff --git a/.gitignore b/.gitignore
index 4bdb94e..f17d9dd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -54,4 +54,5 @@ systest/
 clients/src/generated
 clients/src/generated-test
 jmh-benchmarks/generated
+jmh-benchmarks/src/main/generated
 streams/src/generated
diff --git a/build.gradle b/build.gradle
index aa8bf97..092f4ff 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1540,10 +1540,14 @@ project(':jmh-benchmarks') {
     compile project(':core')
     compile project(':clients')
     compile project(':streams')
+    compile project(':core')
+    compile project(':clients').sourceSets.test.output
+    compile project(':core').sourceSets.test.output
     compile libs.jmhCore
-    compile libs.mockitoCore
     annotationProcessor libs.jmhGeneratorAnnProcess
     compile libs.jmhCoreBenchmarks
+    compile libs.mockitoCore
+    compile libs.slf4jlog4j
   }
 
   jar {
diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index 3536ccb..4b546cb 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -39,6 +39,7 @@
     <allow pkg="kafka.controller"/>
     <allow pkg="kafka.coordinator"/>
     <allow pkg="kafka.network"/>
+    <allow pkg="kafka.utils"/>
     <allow pkg="kafka.zk"/>
     <allow class="kafka.utils.Pool"/>
     <allow class="kafka.utils.KafkaScheduler"/>
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c4a7662..3e41e97 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -284,17 +284,6 @@
     </subpackage>
   </subpackage>
 
-  <subpackage name="jmh">
-    <allow pkg="org.openjdk.jmh.annotations" />
-    <allow pkg="org.openjdk.jmh.runner" />
-    <allow pkg="org.openjdk.jmh.runner.options" />
-    <allow pkg="org.openjdk.jmh.infra" />
-    <allow pkg="org.apache.kafka.common" />
-    <allow pkg="org.apache.kafka.clients" />
-    <allow pkg="org.apache.kafka.streams" />
-    <allow pkg="org.github.jamm" />
-  </subpackage>
-
   <subpackage name="log4jappender">
     <allow pkg="org.apache.log4j" />
     <allow pkg="org.apache.kafka.clients" />
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e55f89d..0c2fccb 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -19,7 +19,7 @@ package kafka.cluster
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import java.util.{Optional, Properties}
 
-import kafka.api.{ApiVersion, LeaderAndIsr, Request}
+import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.KafkaController
 import kafka.log._
@@ -266,7 +266,7 @@ class Partition(val topicPartition: TopicPartition,
     // current replica and the existence of the future replica, no other thread can update the log directory of the
     // current replica or remove the future replica.
     inWriteLock(leaderIsrUpdateLock) {
-      val currentLogDir = localLogOrException.dir.getParent
+      val currentLogDir = localLogOrException.parentDir
       if (currentLogDir == logDir) {
         info(s"Current log directory $currentLogDir is same as requested log dir $logDir. " +
           s"Skipping future replica creation.")
@@ -274,34 +274,34 @@ class Partition(val topicPartition: TopicPartition,
       } else {
         futureLog match {
           case Some(partitionFutureLog) =>
-            val futureLogDir = partitionFutureLog.dir.getParent
+            val futureLogDir = partitionFutureLog.parentDir
             if (futureLogDir != logDir)
               throw new IllegalStateException(s"The future log dir $futureLogDir of $topicPartition is " +
                 s"different from the requested log dir $logDir")
             false
           case None =>
-            createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
+            createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
             true
         }
       }
     }
   }
 
-  def createLogIfNotExists(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
+  def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
     isFutureReplica match {
       case true if futureLog.isEmpty =>
-        val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
+        val log = createLog(isNew, isFutureReplica, offsetCheckpoints)
         this.futureLog = Option(log)
       case false if log.isEmpty =>
-        val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
+        val log = createLog(isNew, isFutureReplica, offsetCheckpoints)
         this.log = Option(log)
       case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
     }
   }
 
   // Visible for testing
-  private[cluster] def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
-    val fetchLogConfig = () => {
+  private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
+    def fetchLogConfig: LogConfig = {
       val props = stateStore.fetchTopicConfig()
       LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
     }
@@ -309,8 +309,8 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[Log] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig(), isNew, isFutureReplica)
-      val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse {
+      val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig, isNew, isFutureReplica)
+      val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse {
         info(s"No checkpointed highwatermark is found for partition $topicPartition")
         0L
       }
@@ -319,7 +319,7 @@ class Partition(val topicPartition: TopicPartition,
       maybeLog = Some(log)
       log
     } finally {
-      logManager.finishedInitializingLog(topicPartition, maybeLog, fetchLogConfig)
+      logManager.finishedInitializingLog(topicPartition, maybeLog, () => fetchLogConfig)
     }
   }
 
@@ -410,7 +410,7 @@ class Partition(val topicPartition: TopicPartition,
 
   def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
     inReadLock(leaderIsrUpdateLock) {
-      futureLog.exists(_.dir.getParent != newDestinationDir)
+      futureLog.exists(_.parentDir != newDestinationDir)
     }
   }
 
@@ -478,9 +478,7 @@ class Partition(val topicPartition: TopicPartition,
    * from the time when this broker was the leader last time) and setting the new leader and ISR.
    * If the leader replica id does not change, return false to indicate the replica manager.
    */
-  def makeLeader(controllerId: Int,
-                 partitionState: LeaderAndIsrPartitionState,
-                 correlationId: Int,
+  def makeLeader(partitionState: LeaderAndIsrPartitionState,
                  highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
@@ -493,7 +491,7 @@ class Partition(val topicPartition: TopicPartition,
         addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
         removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
       )
-      createLogIfNotExists(localBrokerId, partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
+      createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
 
       val leaderLog = localLogOrException
       val leaderEpochStartOffset = leaderLog.logEndOffset
@@ -549,9 +547,7 @@ class Partition(val topicPartition: TopicPartition,
    *  greater (that is, no updates have been missed), return false to indicate to the
    * replica manager that state is already correct and the become-follower steps can be skipped
    */
-  def makeFollower(controllerId: Int,
-                   partitionState: LeaderAndIsrPartitionState,
-                   correlationId: Int,
+  def makeFollower(partitionState: LeaderAndIsrPartitionState,
                    highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
       val newLeaderBrokerId = partitionState.leader
@@ -566,7 +562,7 @@ class Partition(val topicPartition: TopicPartition,
         addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
         removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
       )
-      createLogIfNotExists(localBrokerId, partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
+      createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
 
       leaderEpoch = partitionState.leaderEpoch
       leaderEpochStartOffsetOpt = None
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index c6bce27..20b0cd2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -188,7 +188,7 @@ object RollParams {
  * New log segments are created according to a configurable policy that controls the size in bytes or time interval
  * for a given segment.
  *
- * @param dir The directory in which log segments are created.
+ * @param _dir The directory in which log segments are created.
  * @param config The log configuration settings
  * @param logStartOffset The earliest offset allowed to be exposed to kafka client.
  *                       The logStartOffset can be updated by :
@@ -209,7 +209,7 @@ object RollParams {
  * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
  */
 @threadsafe
-class Log(@volatile var dir: File,
+class Log(@volatile private var _dir: File,
           @volatile var config: LogConfig,
           @volatile var logStartOffset: Long,
           @volatile var recoveryPoint: Long,
@@ -228,36 +228,17 @@ class Log(@volatile var dir: File,
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
+
   // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
   // After memory mapped buffer is closed, no disk IO operation should be performed for this log
   @volatile private var isMemoryMappedBufferClosed = false
 
+  // Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
   /* last time it was flushed */
   private val lastFlushedTime = new AtomicLong(time.milliseconds)
 
-  def initFileSize: Int = {
-    if (config.preallocate)
-      config.segmentSize
-    else
-      0
-  }
-
-  def updateConfig(newConfig: LogConfig): Unit = {
-    val oldConfig = this.config
-    this.config = newConfig
-    val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
-    val newRecordVersion = newConfig.messageFormatVersion.recordVersion
-    if (newRecordVersion.precedes(oldRecordVersion))
-      warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
-    if (newRecordVersion.value != oldRecordVersion.value)
-      initializeLeaderEpochCache()
-  }
-
-  private def checkIfMemoryMappedBufferClosed(): Unit = {
-    if (isMemoryMappedBufferClosed)
-      throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
-  }
-
   @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
 
   /* The earliest offset which is part of an incomplete transaction. This is used to compute the
@@ -316,6 +297,35 @@ class Log(@volatile var dir: File,
       s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
   }
 
+  def dir: File = _dir
+
+  def parentDir: String = _parentDir
+
+  def parentDirFile: File = new File(_parentDir)
+
+  def initFileSize: Int = {
+    if (config.preallocate)
+      config.segmentSize
+    else
+      0
+  }
+
+  def updateConfig(newConfig: LogConfig): Unit = {
+    val oldConfig = this.config
+    this.config = newConfig
+    val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
+    val newRecordVersion = newConfig.messageFormatVersion.recordVersion
+    if (newRecordVersion.precedes(oldRecordVersion))
+      warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
+    if (newRecordVersion.value != oldRecordVersion.value)
+      initializeLeaderEpochCache()
+  }
+
+  private def checkIfMemoryMappedBufferClosed(): Unit = {
+    if (isMemoryMappedBufferClosed)
+      throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
+  }
+
   def highWatermark: Long = highWatermarkMetadata.messageOffset
 
   /**
@@ -961,7 +971,8 @@ class Log(@volatile var dir: File,
         val renamedDir = new File(dir.getParent, name)
         Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
         if (renamedDir != dir) {
-          dir = renamedDir
+          _dir = renamedDir
+          _parentDir = renamedDir.getParent
           logSegments.foreach(_.updateDir(renamedDir))
           producerStateManager.logDir = dir
           // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 312483d..22207e0 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -315,7 +315,7 @@ class LogCleaner(initialConfig: CleanerConfig,
       } catch {
         case e: LogCleaningException =>
           warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e)
-          cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition)
+          cleanerManager.markPartitionUncleanable(e.log.parentDir, e.log.topicPartition)
 
           false
       }
@@ -365,11 +365,11 @@ class LogCleaner(initialConfig: CleanerConfig,
         case _: LogCleaningAbortedException => // task can be aborted, let it go.
         case _: KafkaStorageException => // partition is already offline. let it go.
         case e: IOException =>
-          val logDirectory = cleanable.log.dir.getParent
+          val logDirectory = cleanable.log.parentDir
           val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException"
           logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e)
       } finally {
-        cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
+        cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.parentDirFile, endOffset)
       }
     }
 
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index fe22a61..ba007c7 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -182,7 +182,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
             val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
             // update checkpoint for logs with invalid checkpointed offsets
             if (offsetsToClean.forceUpdateCheckpoint)
-              updateCheckpoints(log.dir.getParentFile(), Option(topicPartition, offsetsToClean.firstDirtyOffset))
+              updateCheckpoints(log.parentDirFile, Option(topicPartition, offsetsToClean.firstDirtyOffset))
             val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now)
             preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
 
@@ -379,7 +379,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
           case Some(offset) =>
             // Remove this partition from the checkpoint file in the source log directory
             updateCheckpoints(sourceLogDir, None)
-            // Add offset for this partition to the checkpoint file in the source log directory
+            // Add offset for this partition to the checkpoint file in the destination log directory
             updateCheckpoints(destLogDir, Option(topicPartition, offset))
           case None =>
         }
@@ -478,7 +478,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
 
   private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = {
     inLock(lock) {
-      uncleanablePartitions.get(log.dir.getParent).exists(partitions => partitions.contains(topicPartition))
+      uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition))
     }
   }
 }
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index d6b14b0..bf4351c 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -199,7 +199,7 @@ class LogManager(logDirs: Seq[File],
         cleaner.handleLogDirFailure(dir)
 
       val offlineCurrentTopicPartitions = currentLogs.collect {
-        case (tp, log) if log.dir.getParent == dir => tp
+        case (tp, log) if log.parentDir == dir => tp
       }
       offlineCurrentTopicPartitions.foreach { topicPartition => {
         val removedLog = currentLogs.remove(topicPartition)
@@ -210,7 +210,7 @@ class LogManager(logDirs: Seq[File],
       }}
 
       val offlineFutureTopicPartitions = futureLogs.collect {
-        case (tp, log) if log.dir.getParent == dir => tp
+        case (tp, log) if log.parentDir == dir => tp
       }
       offlineFutureTopicPartitions.foreach { topicPartition => {
         val removedLog = futureLogs.remove(topicPartition)
@@ -282,7 +282,7 @@ class LogManager(logDirs: Seq[File],
       }
       if (previous != null) {
         if (log.isFuture)
-          throw new IllegalStateException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+          throw new IllegalStateException(s"Duplicate log directories found: ${log.dir.getAbsolutePath}, ${previous.dir.getAbsolutePath}")
         else
           throw new IllegalStateException(s"Duplicate log directories for $topicPartition are found in both ${log.dir.getAbsolutePath} " +
             s"and ${previous.dir.getAbsolutePath}. It is likely because log directory failure happened while broker was " +
@@ -514,7 +514,7 @@ class LogManager(logDirs: Seq[File],
           if (log.truncateTo(truncateOffset))
             affectedLogs += log
           if (needToStopCleaner && !isFuture)
-            cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
+            cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset)
         } finally {
           if (needToStopCleaner && !isFuture) {
             cleaner.resumeCleaning(Seq(topicPartition))
@@ -524,7 +524,7 @@ class LogManager(logDirs: Seq[File],
       }
     }
 
-    for ((dir, logs) <- affectedLogs.groupBy(_.dir.getParentFile)) {
+    for ((dir, logs) <- affectedLogs.groupBy(_.parentDirFile)) {
       checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs)
     }
   }
@@ -551,7 +551,7 @@ class LogManager(logDirs: Seq[File],
       try {
         log.truncateFullyAndStartAt(newOffset)
         if (cleaner != null && !isFuture) {
-          cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
+          cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset)
         }
       } finally {
         if (cleaner != null && !isFuture) {
@@ -559,7 +559,7 @@ class LogManager(logDirs: Seq[File],
           info(s"Compaction for partition $topicPartition is resumed")
         }
       }
-      checkpointRecoveryOffsetsAndCleanSnapshot(log.dir.getParentFile, Seq(log))
+      checkpointRecoveryOffsetsAndCleanSnapshot(log.parentDirFile, Seq(log))
     }
   }
 
@@ -633,8 +633,8 @@ class LogManager(logDirs: Seq[File],
   // The logDir should be an absolute path
   def maybeUpdatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = {
     // Do not cache the preferred log directory if either the current log or the future log for this partition exists in the specified logDir
-    if (!getLog(topicPartition).exists(_.dir.getParent == logDir) &&
-        !getLog(topicPartition, isFuture = true).exists(_.dir.getParent == logDir))
+    if (!getLog(topicPartition).exists(_.parentDir == logDir) &&
+        !getLog(topicPartition, isFuture = true).exists(_.parentDir == logDir))
       preferredLogDirs.put(topicPartition, logDir)
   }
 
@@ -723,7 +723,7 @@ class LogManager(logDirs: Seq[File],
           if (isFuture) {
             if (preferredLogDir == null)
               throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
-            else if (getLog(topicPartition).get.dir.getParent == preferredLogDir)
+            else if (getLog(topicPartition).get.parentDir == preferredLogDir)
               throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
           }
 
@@ -818,7 +818,7 @@ class LogManager(logDirs: Seq[File],
             info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
           } catch {
             case e: KafkaStorageException =>
-              error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
+              error(s"Exception while deleting $removedLog in dir ${removedLog.parentDir}.", e)
           }
         }
       }
@@ -866,7 +866,7 @@ class LogManager(logDirs: Seq[File],
       futureLogs.remove(topicPartition)
       currentLogs.put(topicPartition, destLog)
       if (cleaner != null) {
-        cleaner.alterCheckpointDir(topicPartition, sourceLog.dir.getParentFile, destLog.dir.getParentFile)
+        cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile)
         cleaner.resumeCleaning(Seq(topicPartition))
         info(s"Compaction for partition $topicPartition is resumed")
       }
@@ -876,8 +876,8 @@ class LogManager(logDirs: Seq[File],
         // Now that replica in source log directory has been successfully renamed for deletion.
         // Close the log, update checkpoint files, and enqueue this log to be deleted.
         sourceLog.close()
-        checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.dir.getParentFile, ArrayBuffer.empty)
-        checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
+        checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty)
+        checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
         addLogToBeDeleted(sourceLog)
       } catch {
         case e: KafkaStorageException =>
@@ -911,11 +911,11 @@ class LogManager(logDirs: Seq[File],
       //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
       if (cleaner != null && !isFuture) {
         cleaner.abortCleaning(topicPartition)
-        cleaner.updateCheckpoints(removedLog.dir.getParentFile)
+        cleaner.updateCheckpoints(removedLog.parentDirFile)
       }
       removedLog.renameDir(Log.logDeleteDirName(topicPartition))
-      checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty)
-      checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
+      checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, ArrayBuffer.empty)
+      checkpointLogStartOffsetsInDir(removedLog.parentDirFile)
       addLogToBeDeleted(removedLog)
       info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
     } else if (offlineLogDirs.nonEmpty) {
@@ -934,7 +934,7 @@ class LogManager(logDirs: Seq[File],
       List(_liveLogDirs.peek())
     } else {
       // count the number of logs in each parent directory (including 0 for empty directories
-      val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
+      val logCounts = allLogs.groupBy(_.parentDir).mapValues(_.size)
       val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
       val dirCounts = (zeros ++ logCounts).toBuffer
 
@@ -1005,7 +1005,7 @@ class LogManager(logDirs: Seq[File],
    */
   private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
     (this.currentLogs.toList ++ this.futureLogs.toList).toMap
-      .groupBy { case (_, log) => log.dir.getParent }
+      .groupBy { case (_, log) => log.parentDir }
   }
 
   // logDir should be an absolute path
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 084ffa5..b50d9be 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -480,7 +480,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getLogDir(topicPartition: TopicPartition): Option[String] = {
-    localLog(topicPartition).map(_.dir.getParent)
+    localLog(topicPartition).map(_.parentDir)
   }
 
   /**
@@ -661,7 +661,7 @@ class ReplicaManager(val config: KafkaConfig,
    *    are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented.
    */
   def describeLogDirs(partitions: Set[TopicPartition]): List[DescribeLogDirsResponseData.DescribeLogDirsResult] = {
-    val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent)
+    val logsByDir = logManager.allLogs.groupBy(log => log.parentDir)
 
     config.logDirs.toSet.map { logDir: String =>
       val absolutePath = new File(logDir).getAbsolutePath
@@ -1303,7 +1303,7 @@ class ReplicaManager(val config: KafkaConfig,
               val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
               // Add future replica to partition's map
-              partition.createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true,
+              partition.createLogIfNotExists(isNew = false, isFutureReplica = true,
                 highWatermarkCheckpoints)
 
               // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
@@ -1369,7 +1369,7 @@ class ReplicaManager(val config: KafkaConfig,
       // Update the partition information to be the leader
       partitionStates.foreach { case (partition, partitionState) =>
         try {
-          if (partition.makeLeader(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) {
+          if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) {
             partitionsToMakeLeaders += partition
             stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
               s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " +
@@ -1451,7 +1451,7 @@ class ReplicaManager(val config: KafkaConfig,
           metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
             // Only change partition state when the leader is available
             case Some(_) =>
-              if (partition.makeFollower(controllerId, partitionState, correlationId, highWatermarkCheckpoints))
+              if (partition.makeFollower(partitionState, highWatermarkCheckpoints))
                 partitionsToMakeFollower += partition
               else
                 stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
@@ -1468,7 +1468,7 @@ class ReplicaManager(val config: KafkaConfig,
                 s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
               // Create the local replica even if the leader is unavailable. This is required to ensure that we include
               // the partition's high watermark in the checkpoint file (see KAFKA-1647)
-              partition.createLogIfNotExists(localBrokerId, isNew = partitionState.isNew, isFutureReplica = false,
+              partition.createLogIfNotExists(isNew = partitionState.isNew, isFutureReplica = false,
                 highWatermarkCheckpoints)
           }
         } catch {
@@ -1600,20 +1600,25 @@ class ReplicaManager(val config: KafkaConfig,
 
   // Flushes the highwatermark value for all partitions to the highwatermark file
   def checkpointHighWatermarks(): Unit = {
-    val localLogs = nonOfflinePartitionsIterator.flatMap { partition =>
-      val logsList: mutable.Set[Log] = mutable.Set()
-      partition.log.foreach(logsList.add)
-      partition.futureLog.foreach(logsList.add)
-      logsList
-    }.toBuffer
-    val logsByDir = localLogs.groupBy(_.dir.getParent)
-    for ((dir, logs) <- logsByDir) {
-      val hwms = logs.map(log => log.topicPartition -> log.highWatermark).toMap
-      try {
-        highWatermarkCheckpoints.get(dir).foreach(_.write(hwms))
-      } catch {
+    def putHw(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]],
+              log: Log): Unit = {
+      val checkpoints = logDirToCheckpoints.getOrElseUpdate(log.parentDir,
+        new mutable.AnyRefMap[TopicPartition, Long]())
+      checkpoints.put(log.topicPartition, log.highWatermark)
+    }
+
+    val logDirToHws = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]](
+      allPartitions.size)
+    nonOfflinePartitionsIterator.foreach { partition =>
+      partition.log.foreach(putHw(logDirToHws, _))
+      partition.futureLog.foreach(putHw(logDirToHws, _))
+    }
+
+    for ((logDir, hws) <- logDirToHws) {
+      try highWatermarkCheckpoints.get(logDir).foreach(_.write(hws))
+      catch {
         case e: KafkaStorageException =>
-          error(s"Error while writing to highwatermark file in directory $dir", e)
+          error(s"Error while writing to highwatermark file in directory $logDir", e)
       }
     }
   }
@@ -1632,11 +1637,11 @@ class ReplicaManager(val config: KafkaConfig,
     warn(s"Stopping serving replicas in dir $dir")
     replicaStateChangeLock synchronized {
       val newOfflinePartitions = nonOfflinePartitionsIterator.filter { partition =>
-        partition.log.exists { _.dir.getParent == dir }
+        partition.log.exists { _.parentDir == dir }
       }.map(_.topicPartition).toSet
 
       val partitionsWithOfflineFutureReplica = nonOfflinePartitionsIterator.filter { partition =>
-        partition.futureLog.exists { _.dir.getParent == dir }
+        partition.futureLog.exists { _.parentDir == dir }
       }.toSet
 
       replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
diff --git a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala
index 08dd95e..b218bf7 100644
--- a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala
@@ -112,7 +112,7 @@ class AssignmentStateTest(isr: List[Integer], replicas: List[Integer],
     if (original.nonEmpty)
       partition.assignmentState = SimpleAssignmentState(original)
     // do the test
-    partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+    partition.makeLeader(leaderState, offsetCheckpoints)
     assertEquals(isReassigning, partition.isReassigning)
     if (adding.nonEmpty)
       adding.foreach(r => assertTrue(partition.isAddingReplica(r)))
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index cb1fd80..c8a7d33 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -222,8 +222,8 @@ class PartitionLockTest extends Logging {
         }
       }
 
-      override def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
-        val log = super.createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
+      override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
+        val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints)
         new SlowLog(log, mockTime, appendSemaphore)
       }
     }
@@ -235,21 +235,21 @@ class PartitionLockTest extends Logging {
     when(stateStore.expandIsr(ArgumentMatchers.anyInt, ArgumentMatchers.any[LeaderAndIsr]))
       .thenReturn(Some(2))
 
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
 
     val controllerId = 0
     val controllerEpoch = 0
     val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava
     val isr = replicas
 
-    assertTrue("Expected become leader transition to succeed", partition.makeLeader(controllerId, new LeaderAndIsrPartitionState()
+    assertTrue("Expected become leader transition to succeed", partition.makeLeader(new LeaderAndIsrPartitionState()
       .setControllerEpoch(controllerEpoch)
       .setLeader(brokerId)
       .setLeaderEpoch(leaderEpoch)
       .setIsr(isr)
       .setZkVersion(1)
       .setReplicas(replicas)
-      .setIsNew(true), 0, offsetCheckpoints))
+      .setIsNew(true), offsetCheckpoints))
 
     partition
   }
@@ -310,4 +310,4 @@ class PartitionLockTest extends Logging {
       appendInfo
     }
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 7967428..62ec28b 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -109,7 +109,7 @@ class PartitionTest extends AbstractPartitionTest {
     val latch = new CountDownLatch(1)
 
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
-    partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints)
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
     partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
 
@@ -153,13 +153,13 @@ class PartitionTest extends AbstractPartitionTest {
       metadataCache,
       logManager) {
 
-      override def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
-        val log = super.createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints)
+      override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
+        val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints)
         new SlowLog(log, mockTime, appendSemaphore)
       }
     }
 
-    partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints)
 
     val appendThread = new Thread {
       override def run(): Unit = {
@@ -180,7 +180,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setZkVersion(1)
       .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
       .setIsNew(false)
-    assertTrue(partition.makeFollower(0, partitionState, 0, offsetCheckpoints))
+    assertTrue(partition.makeFollower(partitionState, offsetCheckpoints))
 
     appendSemaphore.release()
     appendThread.join()
@@ -194,7 +194,7 @@ class PartitionTest extends AbstractPartitionTest {
   // active segment
   def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
-    partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints)
     logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
     partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
 
@@ -465,7 +465,6 @@ class PartitionTest extends AbstractPartitionTest {
     val leader = brokerId
     val follower1 = brokerId + 1
     val follower2 = brokerId + 2
-    val controllerId = brokerId + 3
     val replicas = List(leader, follower1, follower2)
     val isr = List[Integer](leader, follower2).asJava
     val leaderEpoch = 8
@@ -486,7 +485,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setIsNew(true)
 
     assertTrue("Expected first makeLeader() to return 'leader changed'",
-      partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
+      partition.makeLeader(leaderState, offsetCheckpoints))
     assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
     assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds)
 
@@ -561,7 +560,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setReplicas(replicas.map(Int.box).asJava)
       .setIsNew(false)
 
-    assertTrue(partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints))
+    assertTrue(partition.makeFollower(followerState, offsetCheckpoints))
 
     // Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition
     val newLeaderState = new LeaderAndIsrPartitionState()
@@ -573,7 +572,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setReplicas(replicas.map(Int.box).asJava)
       .setIsNew(false)
 
-    assertTrue(partition.makeLeader(controllerId, newLeaderState, 2, offsetCheckpoints))
+    assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints))
 
     // Try to get offsets as a client
     fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
@@ -636,34 +635,33 @@ class PartitionTest extends AbstractPartitionTest {
   private def setupPartitionWithMocks(leaderEpoch: Int,
                                       isLeader: Boolean,
                                       log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = {
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
 
-    val controllerId = 0
     val controllerEpoch = 0
     val replicas = List[Integer](brokerId, brokerId + 1).asJava
     val isr = replicas
 
     if (isLeader) {
       assertTrue("Expected become leader transition to succeed",
-        partition.makeLeader(controllerId, new LeaderAndIsrPartitionState()
+        partition.makeLeader(new LeaderAndIsrPartitionState()
           .setControllerEpoch(controllerEpoch)
           .setLeader(brokerId)
           .setLeaderEpoch(leaderEpoch)
           .setIsr(isr)
           .setZkVersion(1)
           .setReplicas(replicas)
-          .setIsNew(true), 0, offsetCheckpoints))
+          .setIsNew(true), offsetCheckpoints))
       assertEquals(leaderEpoch, partition.getLeaderEpoch)
     } else {
       assertTrue("Expected become follower transition to succeed",
-        partition.makeFollower(controllerId, new LeaderAndIsrPartitionState()
+        partition.makeFollower(new LeaderAndIsrPartitionState()
           .setControllerEpoch(controllerEpoch)
           .setLeader(brokerId + 1)
           .setLeaderEpoch(leaderEpoch)
           .setIsr(isr)
           .setZkVersion(1)
           .setReplicas(replicas)
-          .setIsNew(true), 0, offsetCheckpoints))
+          .setIsNew(true), offsetCheckpoints))
       assertEquals(leaderEpoch, partition.getLeaderEpoch)
       assertEquals(None, partition.leaderLogIfLocal)
     }
@@ -673,7 +671,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     val log = partition.localLogOrException
 
     val initialLogStartOffset = 5L
@@ -723,7 +721,6 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testListOffsetIsolationLevels(): Unit = {
-    val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
     val replicas = List[Integer](brokerId, brokerId + 1).asJava
@@ -731,17 +728,17 @@ class PartitionTest extends AbstractPartitionTest {
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
 
     assertTrue("Expected become leader transition to succeed",
-      partition.makeLeader(controllerId, new LeaderAndIsrPartitionState()
+      partition.makeLeader(new LeaderAndIsrPartitionState()
         .setControllerEpoch(controllerEpoch)
         .setLeader(brokerId)
         .setLeaderEpoch(leaderEpoch)
         .setIsr(isr)
         .setZkVersion(1)
         .setReplicas(replicas)
-        .setIsNew(true), 0, offsetCheckpoints))
+        .setIsNew(true), offsetCheckpoints))
     assertEquals(leaderEpoch, partition.getLeaderEpoch)
 
     val records = createTransactionalRecords(List(
@@ -811,7 +808,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setZkVersion(1)
       .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
       .setIsNew(false)
-    partition.makeFollower(0, partitionState, 0, offsetCheckpoints)
+    partition.makeFollower(partitionState, offsetCheckpoints)
 
     // Request with same leader and epoch increases by only 1, do become-follower steps
     partitionState = new LeaderAndIsrPartitionState()
@@ -822,7 +819,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setZkVersion(1)
       .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
       .setIsNew(false)
-    assertTrue(partition.makeFollower(0, partitionState, 2, offsetCheckpoints))
+    assertTrue(partition.makeFollower(partitionState, offsetCheckpoints))
 
     // Request with same leader and same epoch, skip become-follower steps
     partitionState = new LeaderAndIsrPartitionState()
@@ -832,7 +829,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setIsr(List[Integer](0, 1, 2, brokerId).asJava)
       .setZkVersion(1)
       .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
-    assertFalse(partition.makeFollower(0, partitionState, 2, offsetCheckpoints))
+    assertFalse(partition.makeFollower(partitionState, offsetCheckpoints))
   }
 
   @Test
@@ -841,7 +838,6 @@ class PartitionTest extends AbstractPartitionTest {
     val leader = brokerId
     val follower1 = brokerId + 1
     val follower2 = brokerId + 2
-    val controllerId = brokerId + 3
     val replicas = List[Integer](leader, follower1, follower2).asJava
     val isr = List[Integer](leader, follower2).asJava
     val leaderEpoch = 8
@@ -862,7 +858,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setReplicas(replicas)
       .setIsNew(true)
     assertTrue("Expected first makeLeader() to return 'leader changed'",
-               partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints))
+               partition.makeLeader(leaderState, offsetCheckpoints))
     assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
     assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds)
 
@@ -898,7 +894,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setZkVersion(1)
       .setReplicas(replicas)
       .setIsNew(false)
-    partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints)
+    partition.makeFollower(followerState, offsetCheckpoints)
 
     val newLeaderState = new LeaderAndIsrPartitionState()
       .setControllerEpoch(controllerEpoch)
@@ -909,7 +905,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setReplicas(replicas)
       .setIsNew(false)
     assertTrue("Expected makeLeader() to return 'leader changed' after makeFollower()",
-               partition.makeLeader(controllerEpoch, newLeaderState, 2, offsetCheckpoints))
+               partition.makeLeader(newLeaderState, offsetCheckpoints))
     val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset
 
     // append records with the latest leader epoch
@@ -937,7 +933,6 @@ class PartitionTest extends AbstractPartitionTest {
    */
   @Test
   def testDelayedFetchAfterAppendRecords(): Unit = {
-    val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
     val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
@@ -978,7 +973,7 @@ class PartitionTest extends AbstractPartitionTest {
         .setZkVersion(1)
         .setReplicas(replicaIds)
         .setIsNew(true)
-      partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+      partition.makeLeader(leaderState, offsetCheckpoints)
       partitions += partition
     }
 
@@ -1035,8 +1030,7 @@ class PartitionTest extends AbstractPartitionTest {
   }
 
   def createTransactionalRecords(records: Iterable[SimpleRecord],
-                                 baseOffset: Long,
-                                 partitionLeaderEpoch: Int = 0): MemoryRecords = {
+                                 baseOffset: Long): MemoryRecords = {
     val producerId = 1L
     val producerEpoch = 0.toShort
     val baseSequence = 0
@@ -1058,7 +1052,6 @@ class PartitionTest extends AbstractPartitionTest {
     val leader = brokerId
     val follower1 = brokerId + 1
     val follower2 = brokerId + 2
-    val controllerId = brokerId + 3
     val replicas = List[Integer](leader, follower1, follower2).asJava
     val isr = List[Integer](leader).asJava
     val leaderEpoch = 8
@@ -1073,7 +1066,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setZkVersion(1)
       .setReplicas(replicas)
       .setIsNew(true)
-    partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+    partition.makeLeader(leaderState, offsetCheckpoints)
     assertTrue(partition.isAtMinIsr)
   }
 
@@ -1082,7 +1075,6 @@ class PartitionTest extends AbstractPartitionTest {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     seedLogData(log, numRecords = 6, leaderEpoch = 4)
 
-    val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
@@ -1091,12 +1083,11 @@ class PartitionTest extends AbstractPartitionTest {
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
 
     val initializeTimeMs = time.milliseconds()
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(
-        controllerId,
         new LeaderAndIsrPartitionState()
           .setControllerEpoch(controllerEpoch)
           .setLeader(brokerId)
@@ -1105,7 +1096,6 @@ class PartitionTest extends AbstractPartitionTest {
           .setZkVersion(1)
           .setReplicas(replicas)
           .setIsNew(true),
-        0,
         offsetCheckpoints))
 
     val remoteReplica = partition.getReplica(remoteBrokerId).get
@@ -1146,7 +1136,6 @@ class PartitionTest extends AbstractPartitionTest {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
-    val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
@@ -1155,11 +1144,10 @@ class PartitionTest extends AbstractPartitionTest {
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue(
       "Expected become leader transition to succeed",
       partition.makeLeader(
-        controllerId,
         new LeaderAndIsrPartitionState()
           .setControllerEpoch(controllerEpoch)
           .setLeader(brokerId)
@@ -1168,7 +1156,6 @@ class PartitionTest extends AbstractPartitionTest {
           .setZkVersion(1)
           .setReplicas(replicas.map(Int.box).asJava)
           .setIsNew(true),
-        0,
         offsetCheckpoints)
     )
     assertEquals(Set(brokerId), partition.inSyncReplicaIds)
@@ -1213,7 +1200,6 @@ class PartitionTest extends AbstractPartitionTest {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
-    val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
@@ -1222,10 +1208,9 @@ class PartitionTest extends AbstractPartitionTest {
 
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(
-        controllerId,
         new LeaderAndIsrPartitionState()
           .setControllerEpoch(controllerEpoch)
           .setLeader(brokerId)
@@ -1234,7 +1219,6 @@ class PartitionTest extends AbstractPartitionTest {
           .setZkVersion(1)
           .setReplicas(replicas)
           .setIsNew(true),
-        0,
         offsetCheckpoints))
     assertEquals(Set(brokerId), partition.inSyncReplicaIds)
 
@@ -1268,7 +1252,6 @@ class PartitionTest extends AbstractPartitionTest {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
-    val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
@@ -1278,11 +1261,10 @@ class PartitionTest extends AbstractPartitionTest {
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue(
       "Expected become leader transition to succeed",
       partition.makeLeader(
-        controllerId,
         new LeaderAndIsrPartitionState()
           .setControllerEpoch(controllerEpoch)
           .setLeader(brokerId)
@@ -1291,7 +1273,6 @@ class PartitionTest extends AbstractPartitionTest {
           .setZkVersion(1)
           .setReplicas(replicas.map(Int.box).asJava)
           .setIsNew(true),
-        0,
         offsetCheckpoints)
     )
     assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
@@ -1325,7 +1306,6 @@ class PartitionTest extends AbstractPartitionTest {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
-    val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
@@ -1335,11 +1315,10 @@ class PartitionTest extends AbstractPartitionTest {
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue(
       "Expected become leader transition to succeed",
       partition.makeLeader(
-        controllerId,
         new LeaderAndIsrPartitionState()
           .setControllerEpoch(controllerEpoch)
           .setLeader(brokerId)
@@ -1348,7 +1327,6 @@ class PartitionTest extends AbstractPartitionTest {
           .setZkVersion(1)
           .setReplicas(replicas.map(Int.box).asJava)
           .setIsNew(true),
-        0,
         offsetCheckpoints)
     )
     assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
@@ -1399,7 +1377,6 @@ class PartitionTest extends AbstractPartitionTest {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
-    val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
@@ -1409,11 +1386,10 @@ class PartitionTest extends AbstractPartitionTest {
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue(
       "Expected become leader transition to succeed",
       partition.makeLeader(
-        controllerId,
         new LeaderAndIsrPartitionState()
           .setControllerEpoch(controllerEpoch)
           .setLeader(brokerId)
@@ -1422,7 +1398,6 @@ class PartitionTest extends AbstractPartitionTest {
           .setZkVersion(1)
           .setReplicas(replicas.map(Int.box).asJava)
           .setIsNew(true),
-        0,
         offsetCheckpoints)
     )
     assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
@@ -1458,7 +1433,6 @@ class PartitionTest extends AbstractPartitionTest {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
-    val controllerId = 0
     val controllerEpoch = 0
     val leaderEpoch = 5
     val remoteBrokerId = brokerId + 1
@@ -1468,10 +1442,9 @@ class PartitionTest extends AbstractPartitionTest {
     doNothing().when(delayedOperations).checkAndCompleteFetch()
 
     val initializeTimeMs = time.milliseconds()
-    partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     assertTrue("Expected become leader transition to succeed",
       partition.makeLeader(
-        controllerId,
         new LeaderAndIsrPartitionState()
           .setControllerEpoch(controllerEpoch)
           .setLeader(brokerId)
@@ -1480,7 +1453,6 @@ class PartitionTest extends AbstractPartitionTest {
           .setZkVersion(1)
           .setReplicas(replicas)
           .setIsNew(true),
-        0,
         offsetCheckpoints))
     assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
     assertEquals(0L, partition.localLogOrException.highWatermark)
@@ -1513,7 +1485,6 @@ class PartitionTest extends AbstractPartitionTest {
     when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition))
       .thenReturn(Some(4L))
 
-    val controllerId = 0
     val controllerEpoch = 3
     val replicas = List[Integer](brokerId, brokerId + 1).asJava
     val leaderState = new LeaderAndIsrPartitionState()
@@ -1524,7 +1495,7 @@ class PartitionTest extends AbstractPartitionTest {
       .setZkVersion(1)
       .setReplicas(replicas)
       .setIsNew(false)
-    partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+    partition.makeLeader(leaderState, offsetCheckpoints)
     assertEquals(4, partition.localLogOrException.highWatermark)
   }
 
@@ -1553,7 +1524,6 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testUnderReplicatedPartitionsCorrectSemantics(): Unit = {
-    val controllerId = 0
     val controllerEpoch = 3
     val replicas = List[Integer](brokerId, brokerId + 1, brokerId + 2).asJava
     val isr = List[Integer](brokerId, brokerId + 1).asJava
@@ -1566,11 +1536,11 @@ class PartitionTest extends AbstractPartitionTest {
       .setZkVersion(1)
       .setReplicas(replicas)
       .setIsNew(false)
-    partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+    partition.makeLeader(leaderState, offsetCheckpoints)
     assertTrue(partition.isUnderReplicated)
 
     leaderState = leaderState.setIsr(replicas)
-    partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)
+    partition.makeLeader(leaderState, offsetCheckpoints)
     assertFalse(partition.isUnderReplicated)
   }
 
@@ -1626,7 +1596,7 @@ class PartitionTest extends AbstractPartitionTest {
       metadataCache,
       spyLogManager)
 
-    partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+    partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints)
 
     // Validate that initializingLog and finishedInitializingLog was called
     verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
@@ -1660,7 +1630,7 @@ class PartitionTest extends AbstractPartitionTest {
       metadataCache,
       spyLogManager)
 
-    partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+    partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints)
 
     // Validate that initializingLog and finishedInitializingLog was called
     verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
@@ -1695,7 +1665,7 @@ class PartitionTest extends AbstractPartitionTest {
       metadataCache,
       spyLogManager)
 
-    partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints)
+    partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints)
 
     // Validate that initializingLog and finishedInitializingLog was called
     verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e20ed10..4b60020 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -89,7 +89,7 @@ class ReplicaManagerTest {
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
-      partition.createLogIfNotExists(1, isNew = false, isFutureReplica = false,
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       rm.checkpointHighWatermarks()
     } finally {
@@ -109,7 +109,7 @@ class ReplicaManagerTest {
       new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
     try {
       val partition = rm.createPartition(new TopicPartition(topic, 1))
-      partition.createLogIfNotExists(1, isNew = false, isFutureReplica = false,
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       rm.checkpointHighWatermarks()
     } finally {
@@ -164,7 +164,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = rm.createPartition(new TopicPartition(topic, 0))
-      partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -216,7 +216,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
       val topicPartition = new TopicPartition(topic, 0)
       replicaManager.createPartition(topicPartition)
-        .createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -270,7 +270,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
-      partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
@@ -330,7 +330,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1).asJava
 
       val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
-      partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
@@ -436,7 +436,7 @@ class ReplicaManagerTest {
     try {
       val brokerList = Seq[Integer](0, 1).asJava
       val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
-      partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
 
       // Make this replica the leader.
@@ -512,7 +512,7 @@ class ReplicaManagerTest {
       val brokerList = Seq[Integer](0, 1, 2).asJava
 
       val partition = rm.createPartition(new TopicPartition(topic, 0))
-      partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false,
+      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
 
       // Make this replica the leader.
@@ -668,8 +668,8 @@ class ReplicaManagerTest {
       val tp0 = new TopicPartition(topic, 0)
       val tp1 = new TopicPartition(topic, 1)
       val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-      replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
-      replicaManager.createPartition(tp1).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
+      replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
       val partition0Replicas = Seq[Integer](0, 1).asJava
       val partition1Replicas = Seq[Integer](0, 2).asJava
       val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -782,10 +782,10 @@ class ReplicaManagerTest {
     val tp = new TopicPartition(topic, topicPartition)
     val partition = replicaManager.createPartition(tp)
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    partition.createLogIfNotExists(followerBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
-    partition.makeFollower(controllerId,
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.makeFollower(
       leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds),
-      correlationId, offsetCheckpoints)
+      offsetCheckpoints)
 
     // Make local partition a follower - because epoch increased by more than 1, truncation should
     // trigger even though leader does not change
@@ -808,7 +808,6 @@ class ReplicaManagerTest {
     val topicPartition = 0
     val followerBrokerId = 0
     val leaderBrokerId = 1
-    val controllerId = 0
     val leaderEpoch = 1
     val leaderEpochIncrement = 2
     val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
@@ -823,11 +822,9 @@ class ReplicaManagerTest {
     val partition = replicaManager.createPartition(tp)
 
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    partition.createLogIfNotExists(leaderBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     partition.makeLeader(
-      controllerId,
       leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds),
-      correlationId,
       offsetCheckpoints
     )
 
@@ -977,7 +974,7 @@ class ReplicaManagerTest {
 
     val tp0 = new TopicPartition(topic, 0)
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     val partition0Replicas = Seq[Integer](0, 1).asJava
     val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
       Seq(new LeaderAndIsrPartitionState()
@@ -1016,7 +1013,7 @@ class ReplicaManagerTest {
 
     val tp0 = new TopicPartition(topic, 0)
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     val partition0Replicas = Seq[Integer](0, 1).asJava
 
     val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -1064,7 +1061,7 @@ class ReplicaManagerTest {
 
     val tp0 = new TopicPartition(topic, 0)
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     val partition0Replicas = Seq[Integer](0, 1).asJava
 
     val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -1112,7 +1109,7 @@ class ReplicaManagerTest {
 
     val tp0 = new TopicPartition(topic, 0)
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     val partition0Replicas = Seq[Integer](0, 1).asJava
 
     val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -1154,7 +1151,7 @@ class ReplicaManagerTest {
 
     val tp0 = new TopicPartition(topic, 0)
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     val partition0Replicas = Seq[Integer](0, 1).asJava
 
     val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -1193,7 +1190,7 @@ class ReplicaManagerTest {
 
     val tp0 = new TopicPartition(topic, 0)
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
-    replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
+    replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
     val partition0Replicas = Seq[Integer](0, 1).asJava
 
     val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@@ -1293,7 +1290,7 @@ class ReplicaManagerTest {
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
     val mockLog = new Log(
-      dir = new File(new File(config.logDirs.head), s"$topic-0"),
+      _dir = new File(new File(config.logDirs.head), s"$topic-0"),
       config = LogConfig(),
       logStartOffset = 0L,
       recoveryPoint = 0L,
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index f3ca317..a717400 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -212,10 +212,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
             <Package name="org.apache.kafka.jmh.cache.generated"/>
             <Package name="org.apache.kafka.jmh.common.generated"/>
             <Package name="org.apache.kafka.jmh.record.generated"/>
-            <Package name="org.apache.kafka.jmh.producer.generated"/>
             <Package name="org.apache.kafka.jmh.partition.generated"/>
+            <Package name="org.apache.kafka.jmh.producer.generated"/>
             <Package name="org.apache.kafka.jmh.fetchsession.generated"/>
             <Package name="org.apache.kafka.jmh.fetcher.generated"/>
+            <Package name="org.apache.kafka.jmh.server.generated"/>
         </Or>
     </Match>
 
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index a03c9bf..017926c 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -159,7 +159,7 @@ public class ReplicaFetcherThreadBenchmark {
                     0, Time.SYSTEM, partitionStateStore, new DelayedOperationsMock(tp),
                     Mockito.mock(MetadataCache.class), logManager);
 
-            partition.makeFollower(0, partitionState, 0, offsetCheckpoints);
+            partition.makeFollower(partitionState, offsetCheckpoints);
             pool.put(tp, partition);
             offsetAndEpochs.put(tp, new OffsetAndEpoch(0, 0));
             BaseRecords fetched = new BaseRecords() {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 2919eba..d78f3e6 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -121,7 +121,7 @@ public class PartitionMakeFollowerBenchmark {
             ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
             partitionStateStore, delayedOperations,
             Mockito.mock(MetadataCache.class), logManager);
-        partition.createLogIfNotExists(0, true, false, offsetCheckpoints);
+        partition.createLogIfNotExists(true, false, offsetCheckpoints);
         executorService.submit((Runnable) () -> {
             SimpleRecord[] simpleRecords = new SimpleRecord[] {
                 new SimpleRecord(1L, "foo".getBytes(StandardCharsets.UTF_8), "1".getBytes(StandardCharsets.UTF_8)),
@@ -154,7 +154,7 @@ public class PartitionMakeFollowerBenchmark {
             .setZkVersion(1)
             .setReplicas(replicas)
             .setIsNew(true);
-        return partition.makeFollower(0, partitionState, 0, offsetCheckpoints);
+        return partition.makeFollower(partitionState, offsetCheckpoints);
     }
 
     private static LogConfig createLogConfig() {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index c3e0746..a686c3b 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -119,7 +119,7 @@ public class UpdateFollowerFetchStateBenchmark {
                 ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
                 partitionStateStore, delayedOperations,
                 Mockito.mock(MetadataCache.class), logManager);
-        partition.makeLeader(0, partitionState, 0, offsetCheckpoints);
+        partition.makeLeader(partitionState, offsetCheckpoints);
     }
 
     // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java
new file mode 100644
index 0000000..9cb8ac6
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.server;
+
+import java.util.Properties;
+import kafka.cluster.Partition;
+import kafka.cluster.PartitionStateStore;
+import kafka.log.CleanerConfig;
+import kafka.log.LogConfig;
+import kafka.log.LogManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
+import kafka.server.LogDirFailureChannel;
+import kafka.server.MetadataCache;
+import kafka.server.QuotaFactory;
+import kafka.server.ReplicaManager;
+import kafka.server.checkpoints.OffsetCheckpoints;
+import kafka.utils.KafkaScheduler;
+import kafka.utils.MockTime;
+import kafka.utils.Scheduler;
+import kafka.utils.TestUtils;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.mockito.Mockito;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import scala.Option;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import scala.jdk.CollectionConverters;
+
+
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@Fork(3)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+public class HighwatermarkCheckpointBench {
+
+    @Param({"100", "1000", "2000"})
+    public int numTopics;
+
+    @Param({"3"})
+    public int numPartitions;
+
+    private final String topicName = "foo";
+
+    private Scheduler scheduler;
+
+    private Metrics metrics;
+
+    private MockTime time;
+
+    private KafkaConfig brokerProperties;
+
+    private ReplicaManager replicaManager;
+    private QuotaFactory.QuotaManagers quotaManagers;
+    private LogDirFailureChannel failureChannel;
+    private LogManager logManager;
+
+
+    @Setup(Level.Trial)
+    public void setup() {
+        this.scheduler = new KafkaScheduler(1, "scheduler-thread", true);
+        this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
+                0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(),
+                Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, (short) 1));
+        this.metrics = new Metrics();
+        this.time = new MockTime();
+        this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
+        final List<File> files =
+            CollectionConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
+        this.logManager = TestUtils.createLogManager(CollectionConverters.asScalaBuffer(files),
+                LogConfig.apply(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d,
+                        1024 * 1024, 32 * 1024 * 1024,
+                        Double.MAX_VALUE, 15 * 1000, true, "MD5"), time);
+        scheduler.startup();
+        final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+        final MetadataCache metadataCache =
+                new MetadataCache(this.brokerProperties.brokerId());
+        this.quotaManagers =
+                QuotaFactory.instantiate(this.brokerProperties,
+                        this.metrics,
+                        this.time, "");
+        KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) {
+            @Override
+            public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) {
+                return new Properties();
+            }
+        };
+        this.replicaManager = new ReplicaManager(
+                this.brokerProperties,
+                this.metrics,
+                this.time,
+                zkClient,
+                this.scheduler,
+                this.logManager,
+                new AtomicBoolean(false),
+                this.quotaManagers,
+                brokerTopicStats,
+                metadataCache,
+                this.failureChannel,
+                Option.empty());
+        replicaManager.startup();
+
+        List<TopicPartition> topicPartitions = new ArrayList<>();
+        for (int topicNum = 0; topicNum < numTopics; topicNum++) {
+            final String topicName = this.topicName + "-" + topicNum;
+            for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) {
+                topicPartitions.add(new TopicPartition(topicName, partitionNum));
+            }
+        }
+
+        PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
+        Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
+        OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L);
+        for (TopicPartition topicPartition : topicPartitions) {
+            final Partition partition = this.replicaManager.createPartition(topicPartition);
+            partition.createLogIfNotExists(true, false, checkpoints);
+        }
+
+        replicaManager.checkpointHighWatermarks();
+    }
+
+    @TearDown(Level.Trial)
+    public void tearDown() throws Exception {
+        this.replicaManager.shutdown(false);
+        this.metrics.close();
+        this.scheduler.shutdown();
+        this.quotaManagers.shutdown();
+        for (File dir : CollectionConverters.asJavaCollection(logManager.liveLogDirs())) {
+            Utils.delete(dir);
+        }
+    }
+
+
+    @Benchmark
+    @Threads(1)
+    public void measureCheckpointHighWatermarks() {
+        this.replicaManager.checkpointHighWatermarks();
+    }
+}