You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/02/07 23:52:38 UTC

[kafka] branch 2.8 updated: MINOR: Defer log recovery until LogManager startup (#10039)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new e47b317  MINOR: Defer log recovery until LogManager startup (#10039)
e47b317 is described below

commit e47b317edf14749a1835e520fed7018d3b511515
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Sun Feb 7 18:46:41 2021 -0500

    MINOR: Defer log recovery until LogManager startup (#10039)
    
    Currently log recovery begins as soon as we instantiate `LogManager`, but when using a
    Raft-based metadata quorum we won't have configs until after we catch up on the metadata
    log.  We therefore defer log recovery until we actually invoke `startup()` on the `LogManager`
    instance.  This timing difference has no effect when using ZooKeeper because we
    immediately invoke `startup()` on the instantiated instance, but it gives us the necessary
    flexibility for accurate log recovery with updated configs when using a Raft-based metadata
    quorum.
    
    The `LogCleaner` is currently instantiated during construction just after log recovery
    completes, and then it is started in `startup()`.  As an extra precaution, since we are
    no longer performing recovery during construction, we both instantiate and start the
    log cleaner in `startup()` after log recovery completes.
    
    We also convert `LogManager` to use a `ConfigRepository` to load topic configs
    (which can override the default log configs) instead of having a hard-coded
    dependency on ZooKeeper.  We retrieve the topic configs when we invoke `startup()`
    -- which again is effectively no different from a timing perspective than what we do
    today for the ZooKeeper case.
    
    One subtlety is that currently we create the log configs for every topic at this point
    -- if a topic has no config overrides then we associate a copy of the default
    configuration with the topic inside a map, and we retrieve the log configs for that
    topic's partitions from from that map during recovery.  This PR makes a change to
    this series of events as follows.  We do not associate a copy of the the default
    configuration with a topic in the map if the topic has no configs set when we query
    for them.  This saves some memory -- we don't unnecessarily copy the default
    config many times -- but it also means we have use the default log configs for
    that topic later on when recovery for each of its partitions begins.
    
    The difference is that the default configs are dynamically reconfigurable, and they
    could potentially change between the time when we invoke `startup()` and when
    log recovery begins (log recovery can begin quite some time after `startup()` is
    invoked if shutdown was unclean).  Prior to this patch such a change would not
    be used; with this patch they could be if they happen before recovery begins.
    This actually is better -- we are performing log recovery with the most recent
    known defaults when a topic had no overrides at all. Also, `Partition.createLog`
    has logic to handle missed config updates, so the behavior is eventually the same.
    
    The transition of the broker state from `STARTING` to `RECOVERY` currently
    happens within the `LogManager`, and it only occurs if the shutdown was
    unclean.  We move this transition into the broker as it avoids passing a
    reference to the broker state into the `LogManager`.  We also now always
    transition the broker into the `RECOVERY` state as dictated by [the KIP-631 broker state machine](https://cwiki.apache.org/confluence/display/KAFKA/KIP-631%3A+The+Quorumbased+Kafka+Controller#KIP631:TheQuorumbasedKafkaController-TheBrokerStateMachine).
    
    Finally, a few clean-ups were included. One worth highlighting is that `Partition`
    no longer requires a `ConfigRepository`.
    
    Reviewers: David Arthur <da...@confluent.io>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  11 +-
 core/src/main/scala/kafka/log/LogManager.scala     |  92 ++++++-----
 core/src/main/scala/kafka/server/KafkaServer.scala |   7 +-
 .../server/metadata/CachedConfigRepository.scala   |   2 +-
 .../kafka/server/metadata/ZkConfigRepository.scala |  13 +-
 .../unit/kafka/cluster/AbstractPartitionTest.scala |  18 +-
 .../unit/kafka/cluster/AssignmentStateTest.scala   |   3 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |  15 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  73 ++++----
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 183 +++++++++++----------
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  18 +-
 .../server/HighwatermarkPersistenceTest.scala      |   6 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   7 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  12 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   7 +-
 .../partition/PartitionMakeFollowerBenchmark.java  |   7 +-
 .../UpdateFollowerFetchStateBenchmark.java         |   7 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |   2 +-
 19 files changed, 245 insertions(+), 242 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4e4d5d1..6f6cb88 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -95,7 +95,6 @@ object Partition extends KafkaMetricsGroup {
       interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion,
       localBrokerId = replicaManager.config.brokerId,
       time = time,
-      configRepository = configRepository,
       isrChangeListener = isrChangeListener,
       delayedOperations = delayedOperations,
       metadataCache = replicaManager.metadataCache,
@@ -218,7 +217,6 @@ class Partition(val topicPartition: TopicPartition,
                 interBrokerProtocolVersion: ApiVersion,
                 localBrokerId: Int,
                 time: Time,
-                configRepository: ConfigRepository,
                 isrChangeListener: IsrChangeListener,
                 delayedOperations: DelayedOperations,
                 metadataCache: MetadataCache,
@@ -331,11 +329,6 @@ class Partition(val topicPartition: TopicPartition,
 
   // Visible for testing
   private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
-    def fetchLogConfig: LogConfig = {
-      val props = configRepository.topicConfig(topic)
-      LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
-    }
-
     def updateHighWatermark(log: Log) = {
       val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse {
         info(s"No checkpointed highwatermark is found for partition $topicPartition")
@@ -348,12 +341,12 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[Log] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, () => fetchLogConfig, isNew, isFutureReplica)
+      val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica)
       maybeLog = Some(log)
       updateHighWatermark(log)
       log
     } finally {
-      logManager.finishedInitializingLog(topicPartition, maybeLog, () => fetchLogConfig)
+      logManager.finishedInitializingLog(topicPartition, maybeLog)
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 761ba69..b788bf0 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -20,17 +20,16 @@ package kafka.log
 import java.io._
 import java.nio.file.Files
 import java.util.concurrent._
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.OffsetCheckpointFile
+import kafka.server.metadata.ConfigRepository
 import kafka.server._
 import kafka.utils._
-import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
-import org.apache.kafka.metadata.BrokerState
 
 import scala.jdk.CollectionConverters._
 import scala.collection._
@@ -51,7 +50,7 @@ import kafka.utils.Implicits._
 @threadsafe
 class LogManager(logDirs: Seq[File],
                  initialOfflineDirs: Seq[File],
-                 val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation
+                 configRepository: ConfigRepository,
                  val initialDefaultConfig: LogConfig,
                  val cleanerConfig: CleanerConfig,
                  recoveryThreadsPerDataDir: Int,
@@ -61,7 +60,6 @@ class LogManager(logDirs: Seq[File],
                  val retentionCheckMs: Long,
                  val maxPidExpirationMs: Int,
                  scheduler: Scheduler,
-                 val brokerState: AtomicReference[BrokerState],
                  brokerTopicStats: BrokerTopicStats,
                  logDirFailureChannel: LogDirFailureChannel,
                  time: Time) extends Logging with KafkaMetricsGroup {
@@ -118,13 +116,8 @@ class LogManager(logDirs: Seq[File],
     logDirsSet
   }
 
-  loadLogs()
-
-  private[kafka] val cleaner: LogCleaner =
-    if (cleanerConfig.enableCleaner)
-      new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
-    else
-      null
+  @volatile private var _cleaner: LogCleaner = _
+  private[kafka] def cleaner: LogCleaner = _cleaner
 
   newGauge("OfflineLogDirectoryCount", () => offlineLogDirs.size)
 
@@ -255,11 +248,12 @@ class LogManager(logDirs: Seq[File],
   private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty
 
   private[log] def loadLog(logDir: File,
-                      hadCleanShutdown: Boolean,
-                      recoveryPoints: Map[TopicPartition, Long],
-                      logStartOffsets: Map[TopicPartition, Long]): Log = {
+                           hadCleanShutdown: Boolean,
+                           recoveryPoints: Map[TopicPartition, Long],
+                           logStartOffsets: Map[TopicPartition, Long],
+                           topicConfigOverrides: Map[String, LogConfig]): Log = {
     val topicPartition = Log.parseTopicPartitionName(logDir)
-    val config = topicConfigs.getOrElse(topicPartition.topic, currentDefaultConfig)
+    val config = topicConfigOverrides.getOrElse(topicPartition.topic, currentDefaultConfig)
     val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
     val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
 
@@ -302,7 +296,7 @@ class LogManager(logDirs: Seq[File],
   /**
    * Recover and load all logs in the given data directories
    */
-  private[log] def loadLogs(): Unit = {
+  private[log] def loadLogs(topicConfigOverrides: Map[String, LogConfig]): Unit = {
     info(s"Loading logs from log dirs $liveLogDirs")
     val startMs = time.hiResClockMs()
     val threadPools = ArrayBuffer.empty[ExecutorService]
@@ -327,7 +321,6 @@ class LogManager(logDirs: Seq[File],
         } else {
           // log recovery itself is being performed by `Log` class during initialization
           info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
-          brokerState.set(BrokerState.RECOVERY)
         }
 
         var recoveryPoints = Map[TopicPartition, Long]()
@@ -348,7 +341,8 @@ class LogManager(logDirs: Seq[File],
               s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
         }
 
-        val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(_.isDirectory)
+        val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
+          logDir.isDirectory && Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
         val numLogsLoaded = new AtomicInteger(0)
         numTotalLogs += logsToLoad.length
 
@@ -358,7 +352,7 @@ class LogManager(logDirs: Seq[File],
               debug(s"Loading log $logDir")
 
               val logLoadStartMs = time.hiResClockMs()
-              val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets)
+              val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, topicConfigOverrides)
               val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
               val currentNumLoaded = numLogsLoaded.incrementAndGet()
 
@@ -403,7 +397,29 @@ class LogManager(logDirs: Seq[File],
   /**
    *  Start the background threads to flush logs and do log cleanup
    */
-  def startup(): Unit = {
+  def startup(topicNames: Set[String]): Unit = {
+    startupWithConfigOverrides(fetchTopicConfigOverrides(topicNames))
+  }
+
+  // visible for testing
+  private[log] def fetchTopicConfigOverrides(topicNames: Set[String]): Map[String, LogConfig] = {
+    val topicConfigOverrides = mutable.Map[String, LogConfig]()
+    val defaultProps = currentDefaultConfig.originals()
+    topicNames.foreach { topicName =>
+      val overrides = configRepository.topicConfig(topicName)
+      // save memory by only including configs for topics with overrides
+      if (!overrides.isEmpty) {
+        val logConfig = LogConfig.fromProps(defaultProps, overrides)
+        topicConfigOverrides(topicName) = logConfig
+      }
+    }
+    topicConfigOverrides
+  }
+
+  // visible for testing
+  private[log] def startupWithConfigOverrides(topicConfigOverrides: Map[String, LogConfig]): Unit = {
+    loadLogs(topicConfigOverrides) // this could take a while if shutdown was not clean
+
     /* Schedule the cleanup task to delete old logs */
     if (scheduler != null) {
       info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
@@ -433,8 +449,10 @@ class LogManager(logDirs: Seq[File],
                          delay = InitialTaskDelayMs,
                          unit = TimeUnit.MILLISECONDS)
     }
-    if (cleanerConfig.enableCleaner)
-      cleaner.startup()
+    if (cleanerConfig.enableCleaner) {
+      _cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
+      _cleaner.startup()
+    }
   }
 
   /**
@@ -736,11 +754,15 @@ class LogManager(logDirs: Seq[File],
    * relevant log was being loaded.
    */
   def finishedInitializingLog(topicPartition: TopicPartition,
-                              maybeLog: Option[Log],
-                              fetchLogConfig: () => LogConfig): Unit = {
+                              maybeLog: Option[Log]): Unit = {
     val removedValue = partitionsInitializing.remove(topicPartition)
     if (removedValue.contains(true))
-      maybeLog.foreach(_.updateConfig(fetchLogConfig()))
+      maybeLog.foreach(_.updateConfig(fetchLogConfig(topicPartition.topic)))
+  }
+
+  private def fetchLogConfig(topicName: String): LogConfig = {
+    val props = configRepository.topicConfig(topicName)
+    LogConfig.fromProps(currentDefaultConfig.originals, props)
   }
 
   /**
@@ -754,7 +776,7 @@ class LogManager(logDirs: Seq[File],
    * @param isFuture True if the future log of the specified partition should be returned or created
    * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker
    */
-  def getOrCreateLog(topicPartition: TopicPartition, loadConfig: () => LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false): Log = {
     logCreationOrDeletionLock synchronized {
       getLog(topicPartition, isFuture).getOrElse {
         // create the log if it has not already been created in another thread
@@ -791,7 +813,7 @@ class LogManager(logDirs: Seq[File],
           .getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))
           .get // If Failure, will throw
 
-        val config = loadConfig()
+        val config = fetchLogConfig(topicPartition.topic)
         val log = Log(
           dir = logDir,
           config = config,
@@ -1182,8 +1204,7 @@ object LogManager {
 
   def apply(config: KafkaConfig,
             initialOfflineDirs: Seq[String],
-            zkClient: KafkaZkClient,
-            brokerState: AtomicReference[BrokerState],
+            configRepository: ConfigRepository,
             kafkaScheduler: KafkaScheduler,
             time: Time,
             brokerTopicStats: BrokerTopicStats,
@@ -1193,18 +1214,11 @@ object LogManager {
     LogConfig.validateValues(defaultProps)
     val defaultLogConfig = LogConfig(defaultProps)
 
-    // read the log configurations from zookeeper
-    val (topicConfigs, failed) = zkClient.getLogConfigs(
-      zkClient.getAllTopicsInCluster(),
-      defaultProps
-    )
-    if (!failed.isEmpty) throw failed.head._2
-
     val cleanerConfig = LogCleaner.cleanerConfig(config)
 
     new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
       initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
-      topicConfigs = topicConfigs,
+      configRepository = configRepository,
       initialDefaultConfig = defaultLogConfig,
       cleanerConfig = cleanerConfig,
       recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir,
@@ -1214,9 +1228,9 @@ object LogManager {
       retentionCheckMs = config.logCleanupIntervalMs,
       maxPidExpirationMs = config.transactionalIdExpirationMs,
       scheduler = kafkaScheduler,
-      brokerState = brokerState,
       brokerTopicStats = brokerTopicStats,
       logDirFailureChannel = logDirFailureChannel,
       time = time)
   }
+
 }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index dcd0ff0..7aed40d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -244,8 +244,11 @@ class KafkaServer(
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
         /* start log manager */
-        logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
-        logManager.startup()
+        logManager = LogManager(config, initialOfflineDirs,
+          new ZkConfigRepository(new AdminZkClient(zkClient)),
+          kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
+        brokerState.set(BrokerState.RECOVERY)
+        logManager.startup(zkClient.getAllTopicsInCluster())
 
         metadataCache = MetadataCache.zkMetadataCache(config.brokerId)
         // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
diff --git a/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala b/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala
index 22b4ef3..2b52106 100644
--- a/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala
+++ b/core/src/main/scala/kafka/server/metadata/CachedConfigRepository.scala
@@ -31,7 +31,7 @@ import scala.jdk.CollectionConverters._
  * A ConfigRepository that stores configurations locally.
  */
 class CachedConfigRepository extends ConfigRepository {
-  val configMap = new ConcurrentHashMap[ConfigResource, util.HashMap[String, String]]
+  private val configMap = new ConcurrentHashMap[ConfigResource, util.HashMap[String, String]]
 
   /**
    * Set the topic config for the given topic name and the given key to the given value.
diff --git a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
index 02df027..95fe752 100644
--- a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
@@ -32,14 +32,11 @@ object ZkConfigRepository {
 
 class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {
   override def config(configResource: ConfigResource): Properties = {
-    val configResourceType = configResource.`type`()
-    val configTypeForZk = if (configResourceType == Type.TOPIC) {
-      ConfigType.Topic
-    } else if (configResourceType == Type.BROKER) {
-      ConfigType.Broker
-    } else {
-      throw new IllegalArgumentException(s"Unsupported config type: $configResourceType")
+    val configTypeForZk = configResource.`type` match {
+      case Type.TOPIC => ConfigType.Topic
+      case Type.BROKER => ConfigType.Broker
+      case tpe => throw new IllegalArgumentException(s"Unsupported config type: $tpe")
     }
-    adminZkClient.fetchEntityConfig(configTypeForZk, configResource.name())
+    adminZkClient.fetchEntityConfig(configTypeForZk, configResource.name)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 139f694..a897fe8 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -18,12 +18,11 @@ package kafka.cluster
 
 import java.io.File
 import java.util.Properties
-
 import kafka.api.ApiVersion
 import kafka.log.{CleanerConfig, LogConfig, LogManager}
 import kafka.server.{Defaults, MetadataCache}
 import kafka.server.checkpoints.OffsetCheckpoints
-import kafka.server.metadata.ConfigRepository
+import kafka.server.metadata.CachedConfigRepository
 import kafka.utils.TestUtils.{MockAlterIsrManager, MockIsrChangeListener}
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
@@ -32,9 +31,13 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.mockito.ArgumentMatchers
 import org.mockito.Mockito.{mock, when}
 
+object AbstractPartitionTest {
+  val brokerId = 101
+}
+
 class AbstractPartitionTest {
 
-  val brokerId = 101
+  val brokerId = AbstractPartitionTest.brokerId
   val topicPartition = new TopicPartition("test-topic", 0)
   val time = new MockTime()
   var tmpDir: File = _
@@ -44,7 +47,7 @@ class AbstractPartitionTest {
   var alterIsrManager: MockAlterIsrManager = _
   var isrChangeListener: MockIsrChangeListener = _
   var logConfig: LogConfig = _
-  var configRepository: ConfigRepository = _
+  var configRepository: CachedConfigRepository = _
   val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
   val metadataCache: MetadataCache = mock(classOf[MetadataCache])
   val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
@@ -61,9 +64,9 @@ class AbstractPartitionTest {
     tmpDir = TestUtils.tempDir()
     logDir1 = TestUtils.randomPartitionLogDir(tmpDir)
     logDir2 = TestUtils.randomPartitionLogDir(tmpDir)
-    logManager = TestUtils.createLogManager(
-      logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, CleanerConfig(enableCleaner = false), time)
-    logManager.startup()
+    logManager = TestUtils.createLogManager(Seq(logDir1, logDir2), logConfig, configRepository,
+      CleanerConfig(enableCleaner = false), time)
+    logManager.startup(Set.empty)
 
     alterIsrManager = TestUtils.createAlterIsrManager()
     isrChangeListener = TestUtils.createIsrChangeListener()
@@ -72,7 +75,6 @@ class AbstractPartitionTest {
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
-      configRepository,
       isrChangeListener,
       delayedOperations,
       metadataCache,
diff --git a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala
index 098988c..5e087c5 100644
--- a/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala
@@ -23,7 +23,8 @@ import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
 import scala.jdk.CollectionConverters._
 
-object AssignmentStateTest extends AbstractPartitionTest {
+object AssignmentStateTest {
+  import AbstractPartitionTest._
 
   def parameters: java.util.stream.Stream[Arguments] = Seq[Arguments](
     Arguments.of(
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index 8f741cc..b105df3 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -63,11 +63,15 @@ class PartitionLockTest extends Logging {
   var logManager: LogManager = _
   var partition: Partition = _
 
+  private val topicPartition = new TopicPartition("test-topic", 0)
+
   @BeforeEach
   def setUp(): Unit = {
     val logConfig = new LogConfig(new Properties)
-    logManager = TestUtils.createLogManager(Seq(logDir), logConfig, CleanerConfig(enableCleaner = false), mockTime)
-    partition = setupPartitionWithMocks(logManager, logConfig)
+    val configRepository = TestUtils.createConfigRepository(topicPartition.topic, createLogProperties(Map.empty))
+    logManager = TestUtils.createLogManager(Seq(logDir), logConfig, configRepository,
+      CleanerConfig(enableCleaner = false), mockTime)
+    partition = setupPartitionWithMocks(logManager)
   }
 
   @AfterEach
@@ -244,24 +248,21 @@ class PartitionLockTest extends Logging {
     }): Runnable)
   }
 
-  private def setupPartitionWithMocks(logManager: LogManager, logConfig: LogConfig): Partition = {
+  private def setupPartitionWithMocks(logManager: LogManager): Partition = {
     val leaderEpoch = 1
     val brokerId = 0
-    val topicPartition = new TopicPartition("test-topic", 0)
-    val topicConfigProvider = TestUtils.createConfigRepository(topicPartition.topic(), createLogProperties(Map.empty))
     val isrChangeListener: IsrChangeListener = mock(classOf[IsrChangeListener])
     val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
     val metadataCache: MetadataCache = mock(classOf[MetadataCache])
     val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
     val alterIsrManager: AlterIsrManager = mock(classOf[AlterIsrManager])
 
-    logManager.startup()
+    logManager.startup(Set.empty)
     val partition = new Partition(topicPartition,
       replicaLagTimeMaxMs = kafka.server.Defaults.ReplicaLagTimeMaxMs,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       mockTime,
-      topicConfigProvider,
       isrChangeListener,
       delayedOperations,
       metadataCache,
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 3c9c569..9e66969 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -51,7 +51,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testLastFetchedOffsetValidation(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     def append(leaderEpoch: Int, count: Int): Unit = {
       val recordArray = (1 to count).map { i =>
         new SimpleRecord(s"$i".getBytes)
@@ -130,7 +130,7 @@ class PartitionTest extends AbstractPartitionTest {
   def testMakeLeaderUpdatesEpochCache(): Unit = {
     val leaderEpoch = 8
 
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     log.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
       new SimpleRecord("k1".getBytes, "v1".getBytes),
       new SimpleRecord("k2".getBytes, "v2".getBytes)
@@ -154,9 +154,9 @@ class PartitionTest extends AbstractPartitionTest {
   def testMakeLeaderDoesNotUpdateEpochCacheForOldFormats(): Unit = {
     val leaderEpoch = 8
 
-    val logConfig = LogConfig(createLogProperties(Map(
-      LogConfig.MessageFormatVersionProp -> kafka.api.KAFKA_0_10_2_IV0.shortVersion)))
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    configRepository.setTopicConfig(topicPartition.topic,
+      LogConfig.MessageFormatVersionProp, kafka.api.KAFKA_0_10_2_IV0.shortVersion)
+    val log = logManager.getOrCreateLog(topicPartition)
     log.appendAsLeader(TestUtils.records(List(
       new SimpleRecord("k1".getBytes, "v1".getBytes),
       new SimpleRecord("k2".getBytes, "v2".getBytes)),
@@ -224,7 +224,6 @@ class PartitionTest extends AbstractPartitionTest {
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
-      configRepository,
       isrChangeListener,
       delayedOperations,
       metadataCache,
@@ -714,7 +713,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   private def setupPartitionWithMocks(leaderEpoch: Int,
                                       isLeader: Boolean,
-                                      log: Log = logManager.getOrCreateLog(topicPartition, () => logConfig)): Partition = {
+                                      log: Log = logManager.getOrCreateLog(topicPartition)): Partition = {
     partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
 
     val controllerEpoch = 0
@@ -1050,7 +1049,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testUpdateFollowerFetchState(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 6, leaderEpoch = 4)
 
     val controllerEpoch = 0
@@ -1106,7 +1105,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testIsrExpansion(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
     val controllerEpoch = 0
@@ -1167,7 +1166,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testIsrNotExpandedIfUpdateFails(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
     val controllerEpoch = 0
@@ -1221,7 +1220,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testMaybeShrinkIsr(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
     val controllerEpoch = 0
@@ -1268,7 +1267,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testShouldNotShrinkIsrIfPreviousFetchIsCaughtUp(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
     val controllerEpoch = 0
@@ -1333,7 +1332,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testShouldNotShrinkIsrIfFollowerCaughtUpToLogEnd(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
     val controllerEpoch = 0
@@ -1384,7 +1383,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testIsrNotShrunkIfUpdateFails(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
     val controllerEpoch = 0
@@ -1463,7 +1462,7 @@ class PartitionTest extends AbstractPartitionTest {
   }
 
   def handleAlterIsrFailure(error: Errors, callback: (Int, Int, Partition) => Unit): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
     val controllerEpoch = 0
@@ -1510,7 +1509,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testSingleInFlightAlterIsr(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
     val controllerEpoch = 0
@@ -1569,14 +1568,13 @@ class PartitionTest extends AbstractPartitionTest {
       interBrokerProtocolVersion = KAFKA_2_6_IV0, // shouldn't matter, but set this to a ZK isr version
       localBrokerId = brokerId,
       time,
-      configRepository,
       isrChangeListener,
       delayedOperations,
       metadataCache,
       logManager,
       zkIsrManager)
 
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 10, leaderEpoch = 4)
 
     val controllerEpoch = 0
@@ -1619,7 +1617,7 @@ class PartitionTest extends AbstractPartitionTest {
 
   @Test
   def testUseCheckpointToInitializeHighWatermark(): Unit = {
-    val log = logManager.getOrCreateLog(topicPartition, () => logConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     seedLogData(log, numRecords = 6, leaderEpoch = 5)
 
     when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition))
@@ -1689,7 +1687,7 @@ class PartitionTest extends AbstractPartitionTest {
     val topicPartition = new TopicPartition("test", 1)
     val partition = new Partition(
       topicPartition, 1000, ApiVersion.latestVersion, 0,
-      new SystemTime(), configRepository, mock(classOf[IsrChangeListener]), mock(classOf[DelayedOperations]),
+      new SystemTime(), mock(classOf[IsrChangeListener]), mock(classOf[DelayedOperations]),
       mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterIsrManager]))
 
     val replicas = Seq(0, 1, 2, 3)
@@ -1723,14 +1721,17 @@ class PartitionTest extends AbstractPartitionTest {
    */
   @Test
   def testLogConfigNotDirty(): Unit = {
-    val spyLogManager = spy(logManager)
+    logManager.shutdown()
     val spyConfigRepository = spy(configRepository)
+    logManager = TestUtils.createLogManager(
+      logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, configRepository = spyConfigRepository,
+      cleanerConfig = CleanerConfig(enableCleaner = false), time = time)
+    val spyLogManager = spy(logManager)
     val partition = new Partition(topicPartition,
       replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
-      spyConfigRepository,
       isrChangeListener,
       delayedOperations,
       metadataCache,
@@ -1741,9 +1742,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     // Validate that initializingLog and finishedInitializingLog was called
     verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
-    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
-      ArgumentMatchers.any(),
-      ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation
+    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition), ArgumentMatchers.any())
 
     // We should retrieve configs only once
     verify(spyConfigRepository, times(1)).topicConfig(topicPartition.topic())
@@ -1755,9 +1754,13 @@ class PartitionTest extends AbstractPartitionTest {
    */
   @Test
   def testLogConfigDirtyAsTopicUpdated(): Unit = {
+    logManager.shutdown()
     val spyConfigRepository = spy(configRepository)
+    logManager = TestUtils.createLogManager(
+      logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, configRepository = spyConfigRepository,
+      cleanerConfig = CleanerConfig(enableCleaner = false), time = time)
     val spyLogManager = spy(logManager)
-    doAnswer((invocation: InvocationOnMock) => {
+    doAnswer((_: InvocationOnMock) => {
       logManager.initializingLog(topicPartition)
       logManager.topicConfigUpdated(topicPartition.topic())
     }).when(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
@@ -1767,7 +1770,6 @@ class PartitionTest extends AbstractPartitionTest {
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
-      spyConfigRepository,
       isrChangeListener,
       delayedOperations,
       metadataCache,
@@ -1778,9 +1780,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     // Validate that initializingLog and finishedInitializingLog was called
     verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
-    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
-      ArgumentMatchers.any(),
-      ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation
+    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition), ArgumentMatchers.any())
 
     // We should retrieve configs twice, once before log is created, and second time once
     // we find log config is dirty and refresh it.
@@ -1793,9 +1793,15 @@ class PartitionTest extends AbstractPartitionTest {
    */
   @Test
   def testLogConfigDirtyAsBrokerUpdated(): Unit = {
+    logManager.shutdown()
     val spyConfigRepository = spy(configRepository)
+    logManager = TestUtils.createLogManager(
+      logDirs = Seq(logDir1, logDir2), defaultConfig = logConfig, configRepository = spyConfigRepository,
+      cleanerConfig = CleanerConfig(enableCleaner = false), time = time)
+    logManager.startup(Set.empty)
+
     val spyLogManager = spy(logManager)
-    doAnswer((invocation: InvocationOnMock) => {
+    doAnswer((_: InvocationOnMock) => {
       logManager.initializingLog(topicPartition)
       logManager.brokerConfigUpdated()
     }).when(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
@@ -1805,7 +1811,6 @@ class PartitionTest extends AbstractPartitionTest {
       interBrokerProtocolVersion = ApiVersion.latestVersion,
       localBrokerId = brokerId,
       time,
-      spyConfigRepository,
       isrChangeListener,
       delayedOperations,
       metadataCache,
@@ -1816,9 +1821,7 @@ class PartitionTest extends AbstractPartitionTest {
 
     // Validate that initializingLog and finishedInitializingLog was called
     verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
-    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition),
-      ArgumentMatchers.any(),
-      ArgumentMatchers.any()) // This doesn't get evaluated, but needed to satisfy compilation
+    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(topicPartition), ArgumentMatchers.any())
 
     // We should get configs twice, once before log is created, and second time once
     // we find log config is dirty and refresh it.
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 6eb820f..01ca38c 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,18 +20,18 @@ package kafka.log
 import com.yammer.metrics.core.MetricName
 import kafka.metrics.KafkaYammerMetrics
 import kafka.server.checkpoints.OffsetCheckpointFile
+import kafka.server.metadata.{CachedConfigRepository, ConfigRepository}
 import kafka.server.{FetchDataInfo, FetchLogEnd}
 import kafka.utils._
 import org.apache.directory.api.util.FileUtils
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.easymock.EasyMock
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito
-import org.mockito.Mockito.{doAnswer, spy}
+import org.mockito.{ArgumentMatchers, Mockito}
+import org.mockito.Mockito.{doAnswer, mock, never, spy, times, verify}
 
 import java.io._
 import java.nio.file.Files
@@ -61,7 +61,7 @@ class LogManagerTest {
   def setUp(): Unit = {
     logDir = TestUtils.tempDir()
     logManager = createLogManager()
-    logManager.startup()
+    logManager.startup(Set.empty)
   }
 
   @AfterEach
@@ -79,7 +79,7 @@ class LogManagerTest {
    */
   @Test
   def testCreateLog(): Unit = {
-    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0))
     assertEquals(1, logManager.liveLogDirs.size)
 
     val logFile = new File(logDir, name + "-0")
@@ -102,10 +102,10 @@ class LogManagerTest {
       logManagerForTest = Some(createLogManager(Seq(logDir1, logDir2)))
 
       assertEquals(2, logManagerForTest.get.liveLogDirs.size)
-      logManagerForTest.get.startup()
+      logManagerForTest.get.startup(Set.empty)
 
-      val log1 = logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
-      val log2 = logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 1), () => logConfig)
+      val log1 = logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 0))
+      val log2 = logManagerForTest.get.getOrCreateLog(new TopicPartition(name, 1))
 
       val logFile1 = new File(logDir1, name + "-0")
       assertTrue(logFile1.exists)
@@ -145,9 +145,9 @@ class LogManagerTest {
 
     logManager.shutdown()
     logManager = createLogManager(dirs)
-    logManager.startup()
+    logManager.startup(Set.empty)
 
-    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig, isNew = true)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), isNew = true)
     val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
@@ -175,11 +175,11 @@ class LogManagerTest {
         invocation.callRealMethod().asInstanceOf[Try[File]]
       }
     }.when(logManager).createLogDirectory(any(), any())
-    logManager.startup()
+    logManager.startup(Set.empty)
 
     // Request creating a new log.
     // LogManager should try using all configured log directories until one succeeds.
-    logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig, isNew = true)
+    logManager.getOrCreateLog(new TopicPartition(name, 0), isNew = true)
 
     // Verify that half the directories were considered broken,
     assertEquals(dirs.length / 2, brokenDirs.size)
@@ -208,7 +208,7 @@ class LogManagerTest {
    */
   @Test
   def testCleanupExpiredSegments(): Unit = {
-    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0))
     var offset = 0L
     for(_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes())
@@ -245,16 +245,16 @@ class LogManagerTest {
   def testCleanupSegmentsToMaintainSize(): Unit = {
     val setSize = TestUtils.singletonRecords("test".getBytes()).sizeInBytes
     logManager.shutdown()
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 10 * setSize: java.lang.Integer)
-    logProps.put(LogConfig.RetentionBytesProp, 5L * 10L * setSize + 10L: java.lang.Long)
-    val config = LogConfig.fromProps(logConfig.originals, logProps)
+    val configRepository = new CachedConfigRepository
+    val segmentBytes = 10 * setSize
+    configRepository.setTopicConfig(name, LogConfig.SegmentBytesProp, segmentBytes.toString)
+    configRepository.setTopicConfig(name, LogConfig.RetentionBytesProp, (5L * 10L * setSize + 10L).toString)
 
-    logManager = createLogManager()
-    logManager.startup()
+    logManager = createLogManager(configRepository = configRepository)
+    logManager.startup(Set.empty)
 
     // create a log
-    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), () => config)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0))
     var offset = 0L
 
     // add a bunch of messages that should be larger than the retentionSize
@@ -266,7 +266,7 @@ class LogManagerTest {
     }
 
     log.updateHighWatermark(log.logEndOffset)
-    assertEquals(numMessages * setSize / config.segmentSize, log.numberOfSegments, "Check we have the expected number of segments.")
+    assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, "Check we have the expected number of segments.")
 
     // this cleanup shouldn't find any expired segments but should delete some to reduce size
     time.sleep(logManager.InitialTaskDelayMs)
@@ -301,9 +301,12 @@ class LogManagerTest {
   }
 
   private def testDoesntCleanLogs(policy: String): Unit = {
-    val logProps = new Properties()
-    logProps.put(LogConfig.CleanupPolicyProp, policy)
-    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), () => LogConfig.fromProps(logConfig.originals, logProps))
+    logManager.shutdown()
+    val configRepository = new CachedConfigRepository
+    configRepository.setTopicConfig(name, LogConfig.CleanupPolicyProp, policy)
+
+    logManager = createLogManager(configRepository = configRepository)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0))
     var offset = 0L
     for (_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes(), key="test".getBytes())
@@ -326,13 +329,12 @@ class LogManagerTest {
   @Test
   def testTimeBasedFlush(): Unit = {
     logManager.shutdown()
-    val logProps = new Properties()
-    logProps.put(LogConfig.FlushMsProp, 1000: java.lang.Integer)
-    val config = LogConfig.fromProps(logConfig.originals, logProps)
+    val configRepository = new CachedConfigRepository
+    configRepository.setTopicConfig(name, LogConfig.FlushMsProp, "1000")
 
-    logManager = createLogManager()
-    logManager.startup()
-    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), () => config)
+    logManager = createLogManager(configRepository = configRepository)
+    logManager.startup(Set.empty)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0))
     val lastFlush = log.lastFlushTime
     for (_ <- 0 until 200) {
       val set = TestUtils.singletonRecords("test".getBytes())
@@ -356,7 +358,7 @@ class LogManagerTest {
 
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
-      logManager.getOrCreateLog(new TopicPartition("test", partition), () => logConfig)
+      logManager.getOrCreateLog(new TopicPartition("test", partition))
       assertEquals(partition + 1, logManager.allLogs.size, "We should have created the right number of logs")
       val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size)
       assertTrue(counts.max <= counts.min + 1, "Load should balance evenly")
@@ -386,7 +388,7 @@ class LogManagerTest {
   def testRecoveryDirectoryMappingWithTrailingSlash(): Unit = {
     logManager.shutdown()
     logManager = TestUtils.createLogManager(logDirs = Seq(new File(TestUtils.tempDir().getAbsolutePath + File.separator)))
-    logManager.startup()
+    logManager.startup(Set.empty)
     verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager, logManager.liveLogDirs.head)
   }
 
@@ -397,12 +399,12 @@ class LogManagerTest {
   def testRecoveryDirectoryMappingWithRelativeDirectory(): Unit = {
     logManager.shutdown()
     logManager = createLogManager(Seq(new File("data", logDir.getName).getAbsoluteFile))
-    logManager.startup()
+    logManager.startup(Set.empty)
     verifyCheckpointRecovery(Seq(new TopicPartition("test-a", 1)), logManager, logManager.liveLogDirs.head)
   }
 
   private def verifyCheckpointRecovery(topicPartitions: Seq[TopicPartition], logManager: LogManager, logDir: File): Unit = {
-    val logs = topicPartitions.map(logManager.getOrCreateLog(_, () => logConfig))
+    val logs = topicPartitions.map(logManager.getOrCreateLog(_))
     logs.foreach { log =>
       for (_ <- 0 until 50)
         log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
@@ -418,16 +420,18 @@ class LogManagerTest {
     }
   }
 
-  private def createLogManager(logDirs: Seq[File] = Seq(this.logDir)): LogManager = {
+  private def createLogManager(logDirs: Seq[File] = Seq(this.logDir),
+                               configRepository: ConfigRepository = new CachedConfigRepository): LogManager = {
     TestUtils.createLogManager(
       defaultConfig = logConfig,
+      configRepository = configRepository,
       logDirs = logDirs,
       time = this.time)
   }
 
   @Test
   def testFileReferencesAfterAsyncDelete(): Unit = {
-    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), () => logConfig)
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0))
     val activeSegment = log.activeSegment
     val logName = activeSegment.log.file.getName
     val indexName = activeSegment.offsetIndex.file.getName
@@ -464,7 +468,7 @@ class LogManagerTest {
   @Test
   def testCreateAndDeleteOverlyLongTopic(): Unit = {
     val invalidTopicName = String.join("", Collections.nCopies(253, "x"))
-    logManager.getOrCreateLog(new TopicPartition(invalidTopicName, 0), () => logConfig)
+    logManager.getOrCreateLog(new TopicPartition(invalidTopicName, 0))
     logManager.asyncDelete(new TopicPartition(invalidTopicName, 0))
   }
 
@@ -477,7 +481,7 @@ class LogManagerTest {
       new TopicPartition("test-b", 0),
       new TopicPartition("test-b", 1))
 
-    val allLogs = tps.map(logManager.getOrCreateLog(_, () => logConfig))
+    val allLogs = tps.map(logManager.getOrCreateLog(_))
     allLogs.foreach { log =>
       for (_ <- 0 until 50)
         log.appendAsLeader(TestUtils.singletonRecords("test".getBytes), leaderEpoch = 0)
@@ -504,52 +508,53 @@ class LogManagerTest {
    */
   @Test
   def testTopicConfigChangeUpdatesLogConfig(): Unit = {
+    logManager.shutdown()
+    val spyConfigRepository = spy(new CachedConfigRepository)
+    logManager = createLogManager(configRepository = spyConfigRepository)
+    val spyLogManager = spy(logManager)
+    val mockLog = mock(classOf[Log])
+
     val testTopicOne = "test-topic-one"
     val testTopicTwo = "test-topic-two"
-    val testTopicOnePartition: TopicPartition = new TopicPartition(testTopicOne, 1)
-    val testTopicTwoPartition: TopicPartition = new TopicPartition(testTopicTwo, 1)
-    val mockLog: Log = EasyMock.mock(classOf[Log])
+    val testTopicOnePartition = new TopicPartition(testTopicOne, 1)
+    val testTopicTwoPartition = new TopicPartition(testTopicTwo, 1)
 
-    logManager.initializingLog(testTopicOnePartition)
-    logManager.initializingLog(testTopicTwoPartition)
+    spyLogManager.initializingLog(testTopicOnePartition)
+    spyLogManager.initializingLog(testTopicTwoPartition)
 
-    logManager.topicConfigUpdated(testTopicOne)
+    spyLogManager.topicConfigUpdated(testTopicOne)
 
-    val logConfig: LogConfig = null
-    var configUpdated = false
-    logManager.finishedInitializingLog(testTopicOnePartition, Some(mockLog), () => {
-      configUpdated = true
-      logConfig
-    })
-    assertTrue(configUpdated)
+    spyLogManager.finishedInitializingLog(testTopicOnePartition, Some(mockLog))
+    spyLogManager.finishedInitializingLog(testTopicTwoPartition, Some(mockLog))
 
-    var configNotUpdated = true
-    logManager.finishedInitializingLog(testTopicTwoPartition, Some(mockLog), () => {
-      configNotUpdated = false
-      logConfig
-    })
-    assertTrue(configNotUpdated)
+    // testTopicOne configs loaded again due to the update
+    verify(spyLogManager).initializingLog(ArgumentMatchers.eq(testTopicOnePartition))
+    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(testTopicOnePartition), ArgumentMatchers.any())
+    verify(spyConfigRepository, times(1)).topicConfig(testTopicOne)
+
+    // testTopicTwo configs not loaded again since there was no update
+    verify(spyLogManager).initializingLog(ArgumentMatchers.eq(testTopicTwoPartition))
+    verify(spyLogManager).finishedInitializingLog(ArgumentMatchers.eq(testTopicTwoPartition), ArgumentMatchers.any())
+    verify(spyConfigRepository, never).topicConfig(testTopicTwo)
   }
 
   /**
    * Test if an error occurs when creating log, log manager removes corresponding
-   * topic partition from the list of initializing partitions.
+   * topic partition from the list of initializing partitions and no configs are retrieved.
    */
   @Test
   def testConfigChangeGetsCleanedUp(): Unit = {
-    val testTopicPartition: TopicPartition = new TopicPartition("test-topic", 1)
-
-    logManager.initializingLog(testTopicPartition)
+    logManager.shutdown()
+    val spyConfigRepository = spy(new CachedConfigRepository)
+    logManager = createLogManager(configRepository = spyConfigRepository)
+    val spyLogManager = spy(logManager)
 
-    val logConfig: LogConfig = null
-    var configUpdateNotCalled = true
-    logManager.finishedInitializingLog(testTopicPartition, None, () => {
-      configUpdateNotCalled = false
-      logConfig
-    })
+    val testTopicPartition = new TopicPartition("test-topic", 1)
+    spyLogManager.initializingLog(testTopicPartition)
+    spyLogManager.finishedInitializingLog(testTopicPartition, None)
 
     assertTrue(logManager.partitionsInitializing.isEmpty)
-    assertTrue(configUpdateNotCalled)
+    verify(spyConfigRepository, never).topicConfig(testTopicPartition.topic)
   }
 
   /**
@@ -558,29 +563,27 @@ class LogManagerTest {
    */
   @Test
   def testBrokerConfigChangeDeliveredToAllLogs(): Unit = {
+    logManager.shutdown()
+    val spyConfigRepository = spy(new CachedConfigRepository)
+    logManager = createLogManager(configRepository = spyConfigRepository)
+    val spyLogManager = spy(logManager)
+    val mockLog = mock(classOf[Log])
+
     val testTopicOne = "test-topic-one"
     val testTopicTwo = "test-topic-two"
-    val testTopicOnePartition: TopicPartition = new TopicPartition(testTopicOne, 1)
-    val testTopicTwoPartition: TopicPartition = new TopicPartition(testTopicTwo, 1)
-    val mockLog: Log = EasyMock.mock(classOf[Log])
+    val testTopicOnePartition = new TopicPartition(testTopicOne, 1)
+    val testTopicTwoPartition = new TopicPartition(testTopicTwo, 1)
 
-    logManager.initializingLog(testTopicOnePartition)
-    logManager.initializingLog(testTopicTwoPartition)
+    spyLogManager.initializingLog(testTopicOnePartition)
+    spyLogManager.initializingLog(testTopicTwoPartition)
 
-    logManager.brokerConfigUpdated()
+    spyLogManager.brokerConfigUpdated()
 
-    val logConfig: LogConfig = null
-    var totalChanges = 0
-    logManager.finishedInitializingLog(testTopicOnePartition, Some(mockLog), () => {
-      totalChanges += 1
-      logConfig
-    })
-    logManager.finishedInitializingLog(testTopicTwoPartition, Some(mockLog), () => {
-      totalChanges += 1
-      logConfig
-    })
+    spyLogManager.finishedInitializingLog(testTopicOnePartition, Some(mockLog))
+    spyLogManager.finishedInitializingLog(testTopicTwoPartition, Some(mockLog))
 
-    assertEquals(2, totalChanges)
+    verify(spyConfigRepository, times(1)).topicConfig(testTopicOne)
+    verify(spyConfigRepository, times(1)).topicConfig(testTopicTwo)
   }
 
   /**
@@ -613,7 +616,7 @@ class LogManagerTest {
     }
 
     // Create the Log and assert that the metrics are present
-    logManager.getOrCreateLog(tp, () => logConfig)
+    logManager.getOrCreateLog(tp)
     verifyMetrics()
 
     // Trigger the deletion and assert that the metrics have been removed
@@ -621,7 +624,7 @@ class LogManagerTest {
     assertTrue(logMetrics.isEmpty)
 
     // Recreate the Log and assert that the metrics are present
-    logManager.getOrCreateLog(tp, () => logConfig)
+    logManager.getOrCreateLog(tp)
     verifyMetrics()
 
     // Advance time past the file deletion delay and assert that the removed log has been deleted but the metrics
@@ -636,7 +639,7 @@ class LogManagerTest {
     val dir1 = TestUtils.tempDir()
     val dir2 = TestUtils.tempDir()
     logManager = createLogManager(Seq(dir1, dir2))
-    logManager.startup()
+    logManager.startup(Set.empty)
 
     val topicName = "future-log"
     def logMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.
@@ -654,9 +657,9 @@ class LogManagerTest {
 
     // Create the current and future logs and verify that metrics are present for both current and future logs
     logManager.maybeUpdatePreferredLogDir(tp, dir1.getAbsolutePath)
-    logManager.getOrCreateLog(tp, () => logConfig)
+    logManager.getOrCreateLog(tp)
     logManager.maybeUpdatePreferredLogDir(tp, dir2.getAbsolutePath)
-    logManager.getOrCreateLog(tp, () => logConfig, isFuture = true)
+    logManager.getOrCreateLog(tp, isFuture = true)
     verifyMetrics(2)
 
     // Replace the current log with the future one and verify that only one set of metrics are present
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index e969410..b39dd7a 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -20,7 +20,6 @@ package kafka.log
 import java.io._
 import java.nio.ByteBuffer
 import java.nio.file.{Files, Paths}
-import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.{Callable, Executors}
 import java.util.regex.Pattern
 import java.util.{Collections, Optional, Properties}
@@ -31,6 +30,7 @@ import kafka.log.Log.DeleteDirSuffix
 import kafka.metrics.KafkaYammerMetrics
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
+import kafka.server.metadata.CachedConfigRepository
 import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogDirFailureChannel, LogOffsetMetadata, PartitionMetadataFile}
 import kafka.utils._
 import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
@@ -42,7 +42,6 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.metadata.BrokerState
 import org.easymock.EasyMock
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -95,15 +94,14 @@ class LogTest {
     // Create a LogManager with some overridden methods to facilitate interception of clean shutdown
     // flag and to inject a runtime error
     def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File]): LogManager = {
-      new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], topicConfigs = Map(),
+      new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], new CachedConfigRepository(),
         initialDefaultConfig = logConfig, cleanerConfig = CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4,
         flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 10000L, flushStartOffsetCheckpointMs = 10000L,
         retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, scheduler = time.scheduler, time = time,
-        brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING),
         brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size)) {
 
          override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long],
-                     logStartOffsets: Map[TopicPartition, Long]): Log = {
+                     logStartOffsets: Map[TopicPartition, Long], topicConfigs: Map[String, LogConfig]): Log = {
 
           val topicPartition = Log.parseTopicPartitionName(logDir)
           val config = topicConfigs.getOrElse(topicPartition.topic, currentDefaultConfig)
@@ -130,28 +128,28 @@ class LogTest {
 
     val cleanShutdownFile = new File(logDir, Log.CleanShutdownFile)
     val logManager: LogManager = interceptedLogManager(logConfig, logDirs)
-    log = logManager.getOrCreateLog(topicPartition, () => logConfig, isNew = true)
+    log = logManager.getOrCreateLog(topicPartition, isNew = true)
 
     // Load logs after a clean shutdown
     Files.createFile(cleanShutdownFile.toPath)
     cleanShutdownInterceptedValue = false
-    logManager.loadLogs()
+    logManager.loadLogs(logManager.fetchTopicConfigOverrides(Set.empty))
     assertTrue(cleanShutdownInterceptedValue, "Unexpected value intercepted for clean shutdown flag")
     assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not exist after loadLogs has completed")
     // Load logs without clean shutdown file
     cleanShutdownInterceptedValue = true
-    logManager.loadLogs()
+    logManager.loadLogs(logManager.fetchTopicConfigOverrides(Set.empty))
     assertFalse(cleanShutdownInterceptedValue, "Unexpected value intercepted for clean shutdown flag")
     assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not exist after loadLogs has completed")
     // Create clean shutdown file and then simulate error while loading logs such that log loading does not complete.
     Files.createFile(cleanShutdownFile.toPath)
     simulateError = true
-    assertThrows(classOf[RuntimeException], () => logManager.loadLogs())
+    assertThrows(classOf[RuntimeException], () => logManager.loadLogs(logManager.fetchTopicConfigOverrides(Set.empty)))
     assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not have existed")
     // Do not simulate error on next call to LogManager#loadLogs. LogManager must understand that log had unclean shutdown the last time.
     simulateError = false
     cleanShutdownInterceptedValue = true
-    logManager.loadLogs()
+    logManager.loadLogs(logManager.fetchTopicConfigOverrides(Set.empty))
     assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean shutdown flag")
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index a7a65f6..c39ae7c 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -75,7 +75,7 @@ class HighwatermarkPersistenceTest {
       val tp0 = new TopicPartition(topic, 0)
       val partition0 = replicaManager.createPartition(tp0)
       // create leader and follower replicas
-      val log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic, 0), () => LogConfig())
+      val log0 = logManagers.head.getOrCreateLog(new TopicPartition(topic, 0))
       partition0.setLog(log0, isFutureLog = false)
 
       partition0.updateAssignmentAndIsr(
@@ -124,7 +124,7 @@ class HighwatermarkPersistenceTest {
       val t1p0 = new TopicPartition(topic1, 0)
       val topic1Partition0 = replicaManager.createPartition(t1p0)
       // create leader log
-      val topic1Log0 = logManagers.head.getOrCreateLog(t1p0, () => LogConfig())
+      val topic1Log0 = logManagers.head.getOrCreateLog(t1p0)
       // create a local replica for topic1
       topic1Partition0.setLog(topic1Log0, isFutureLog = false)
       replicaManager.checkpointHighWatermarks()
@@ -141,7 +141,7 @@ class HighwatermarkPersistenceTest {
       val t2p0 = new TopicPartition(topic2, 0)
       val topic2Partition0 = replicaManager.createPartition(t2p0)
       // create leader log
-      val topic2Log0 = logManagers.head.getOrCreateLog(t2p0, () => LogConfig())
+      val topic2Log0 = logManagers.head.getOrCreateLog(t2p0)
       // create a local replica for topic2
       topic2Partition0.setLog(topic2Log0, isFutureLog = false)
       replicaManager.checkpointHighWatermarks()
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 58325e1..1ce8006 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -163,7 +163,7 @@ class LogOffsetTest extends BaseRequestTest {
     createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.getOrCreateLog(topicPartition, () => logManager.initialDefaultConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
 
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -192,7 +192,7 @@ class LogOffsetTest extends BaseRequestTest {
     createTopic(topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.getOrCreateLog(topicPartition, () => logManager.initialDefaultConfig)
+    val log = logManager.getOrCreateLog(topicPartition)
     for (_ <- 0 until 20)
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
     log.flush()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 7ae7211..a21a016 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1505,8 +1505,7 @@ class ReplicaManagerTest {
     val mockLogMgr: LogManager = EasyMock.createMock(classOf[LogManager])
     EasyMock.expect(mockLogMgr.liveLogDirs).andReturn(config.logDirs.map(new File(_).getAbsoluteFile)).anyTimes
     EasyMock.expect(mockLogMgr.getOrCreateLog(EasyMock.eq(topicPartitionObj),
-      EasyMock.anyObject[() => LogConfig](), isNew = EasyMock.eq(false),
-      isFuture = EasyMock.eq(false))).andReturn(mockLog).anyTimes
+      isNew = EasyMock.eq(false), isFuture = EasyMock.eq(false))).andReturn(mockLog).anyTimes
     if (expectTruncation) {
       EasyMock.expect(mockLogMgr.truncateTo(Map(topicPartitionObj -> offsetFromLeader),
         isFuture = false)).once
@@ -1515,7 +1514,7 @@ class ReplicaManagerTest {
     EasyMock.expect(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).andReturn(None)
 
     EasyMock.expect(mockLogMgr.finishedInitializingLog(
-      EasyMock.eq(topicPartitionObj), EasyMock.anyObject(), EasyMock.anyObject())).anyTimes
+      EasyMock.eq(topicPartitionObj), EasyMock.anyObject())).anyTimes
 
     EasyMock.replay(mockLogMgr)
 
@@ -2040,7 +2039,7 @@ class ReplicaManagerTest {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
 
     val tp0 = new TopicPartition(topic, 0)
-    val log = replicaManager.logManager.getOrCreateLog(tp0, () => LogConfig(), true)
+    val log = replicaManager.logManager.getOrCreateLog(tp0, true)
 
     if (throwIOException) {
       // Delete the underlying directory to trigger an KafkaStorageException
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bbbd7a9..6a7db80 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -23,10 +23,9 @@ import java.nio.charset.{Charset, StandardCharsets}
 import java.nio.file.{Files, StandardOpenOption}
 import java.security.cert.X509Certificate
 import java.time.Duration
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.{Arrays, Collections, Properties}
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-
 import javax.net.ssl.X509TrustManager
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
@@ -37,7 +36,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
 import com.yammer.metrics.core.Meter
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.metrics.KafkaYammerMetrics
-import kafka.server.metadata.{ConfigRepository, CachedConfigRepository, MetadataBroker}
+import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, MetadataBroker}
 import kafka.utils.Implicits._
 import kafka.zk._
 import org.apache.kafka.clients.CommonClientConfigs
@@ -61,7 +60,6 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.common.{KafkaFuture, Node, TopicPartition}
-import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
 import org.apache.zookeeper.KeeperException.SessionExpiredException
@@ -1074,11 +1072,12 @@ object TestUtils extends Logging {
    */
   def createLogManager(logDirs: Seq[File] = Seq.empty[File],
                        defaultConfig: LogConfig = LogConfig(),
+                       configRepository: ConfigRepository = new CachedConfigRepository,
                        cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
                        time: MockTime = new MockTime()): LogManager = {
     new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
                    initialOfflineDirs = Array.empty[File],
-                   topicConfigs = Map(),
+                   configRepository = configRepository,
                    initialDefaultConfig = defaultConfig,
                    cleanerConfig = cleanerConfig,
                    recoveryThreadsPerDataDir = 4,
@@ -1089,7 +1088,6 @@ object TestUtils extends Logging {
                    maxPidExpirationMs = 60 * 60 * 1000,
                    scheduler = time.scheduler,
                    time = time,
-                   brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING),
                    brokerTopicStats = new BrokerTopicStats,
                    logDirFailureChannel = new LogDirFailureChannel(logDirs.size))
   }
@@ -1156,7 +1154,7 @@ object TestUtils extends Logging {
     new MockIsrChangeListener()
   }
 
-  def createConfigRepository(topic: String, props: Properties): ConfigRepository = {
+  def createConfigRepository(topic: String, props: Properties): CachedConfigRepository = {
     val configRepository = new CachedConfigRepository()
     props.entrySet().forEach(e => configRepository.setTopicConfig(topic, e.getKey.toString, e.getValue.toString))
     configRepository
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index ed87697..8cc30fc 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -56,7 +56,6 @@ import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.metadata.BrokerState;
 import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -89,7 +88,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 @State(Scope.Benchmark)
 @Fork(value = 1)
@@ -124,7 +122,7 @@ public class ReplicaFetcherThreadBenchmark {
         LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
         logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
                 JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(),
-                new scala.collection.mutable.HashMap<>(),
+                new CachedConfigRepository(),
                 logConfig,
                 new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"),
                 1,
@@ -134,7 +132,6 @@ public class ReplicaFetcherThreadBenchmark {
                 1000L,
                 60000,
                 scheduler,
-                new AtomicReference<>(BrokerState.NOT_RUNNING),
                 brokerTopicStats,
                 logDirFailureChannel,
                 Time.SYSTEM);
@@ -159,7 +156,7 @@ public class ReplicaFetcherThreadBenchmark {
             Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
             AlterIsrManager isrChannelManager = Mockito.mock(AlterIsrManager.class);
             Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(),
-                    0, Time.SYSTEM, new CachedConfigRepository(), isrChangeListener, new DelayedOperationsMock(tp),
+                    0, Time.SYSTEM, isrChangeListener, new DelayedOperationsMock(tp),
                     Mockito.mock(MetadataCache.class), logManager, isrChannelManager);
 
             partition.makeFollower(partitionState, offsetCheckpoints);
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index e78d937..9f33ceb 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -39,7 +39,6 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.metadata.BrokerState;
 import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -68,7 +67,6 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 @State(Scope.Benchmark)
 @Fork(value = 1)
@@ -100,7 +98,7 @@ public class PartitionMakeFollowerBenchmark {
         LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
         logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
             JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(),
-            new scala.collection.mutable.HashMap<>(),
+            new CachedConfigRepository(),
             logConfig,
             new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"),
             1,
@@ -110,7 +108,6 @@ public class PartitionMakeFollowerBenchmark {
             1000L,
             60000,
             scheduler,
-            new AtomicReference<>(BrokerState.NOT_RUNNING),
             brokerTopicStats,
             logDirFailureChannel,
             Time.SYSTEM);
@@ -122,7 +119,7 @@ public class PartitionMakeFollowerBenchmark {
         AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
         partition = new Partition(tp, 100,
             ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
-            new CachedConfigRepository(), isrChangeListener, delayedOperations,
+            isrChangeListener, delayedOperations,
             Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
         partition.createLogIfNotExists(true, false, offsetCheckpoints);
         executorService.submit((Runnable) () -> {
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index b0d76a4..1230a1c 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -36,7 +36,6 @@ import kafka.utils.KafkaScheduler;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.metadata.BrokerState;
 import org.mockito.Mockito;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -60,7 +59,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 @State(Scope.Benchmark)
 @Fork(value = 1)
@@ -85,7 +83,7 @@ public class UpdateFollowerFetchStateBenchmark {
         List<File> logDirs = Collections.singletonList(logDir);
         logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
                 JavaConverters.asScalaIteratorConverter(new ArrayList<File>().iterator()).asScala().toSeq(),
-                new scala.collection.mutable.HashMap<>(),
+                new CachedConfigRepository(),
                 logConfig,
                 new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"),
                 1,
@@ -95,7 +93,6 @@ public class UpdateFollowerFetchStateBenchmark {
                 1000L,
                 60000,
                 scheduler,
-                new AtomicReference<>(BrokerState.NOT_RUNNING),
                 brokerTopicStats,
                 logDirFailureChannel,
                 Time.SYSTEM);
@@ -120,7 +117,7 @@ public class UpdateFollowerFetchStateBenchmark {
         AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class);
         partition = new Partition(topicPartition, 100,
                 ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
-                new CachedConfigRepository(), isrChangeListener, delayedOperations,
+                isrChangeListener, delayedOperations,
                 Mockito.mock(MetadataCache.class), logManager, alterIsrManager);
         partition.makeLeader(partitionState, offsetCheckpoints);
     }
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 0c21327..45d2cb1 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -104,7 +104,7 @@ public class CheckpointBench {
         final List<File> files =
             JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
         this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
-                LogConfig.apply(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d,
+                LogConfig.apply(), new CachedConfigRepository(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d,
                         1024 * 1024, 32 * 1024 * 1024,
                         Double.MAX_VALUE, 15 * 1000, true, "MD5"), time);
         scheduler.startup();