You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/12/07 18:43:49 UTC

[kafka] branch trunk updated: MINOR: Move dynamic config logic to DynamicConfigPublisher (#12958)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 100e8746713 MINOR: Move dynamic config logic to DynamicConfigPublisher (#12958)
100e8746713 is described below

commit 100e87467135514170bdd4a0614f9b1c37c04c48
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Wed Dec 7 10:43:34 2022 -0800

    MINOR: Move dynamic config logic to DynamicConfigPublisher (#12958)
    
    Split out the logic for applying dynamic configurations to a KafkaConfig object from
    BrokerMetadataPublisher into a new class, DynamicConfigPublisher. This will allow the
    ControllerServer to also run this code, in a follow-up change.
    
    Create separate KafkaConfig objects in BrokerServer versus ControllerServer. This is necessary
    because the controller will apply configuration changes as soon as its raft client catches up to
    the high water mark, whereas the broker will wait for the active controller to acknowledge it has
    caught up in a heartbeat response. So when running in combined mode, we want two separate
    KafkaConfig objects that are changed at different times.
    
    Minor changes: improve the error message when catching up broker metadata fails. Fix incorrect
    indentation in checkstyle/import-control.xml. Invoke AppInfoParser.unregisterAppInfo from
    SharedServer.stop so that it happens only when both the controller and broker have shut down.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 checkstyle/import-control.xml                      |   2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  24 +++--
 .../main/scala/kafka/server/ControllerServer.scala |   6 +-
 .../src/main/scala/kafka/server/SharedServer.scala |  22 +++--
 .../server/metadata/BrokerMetadataPublisher.scala  |  61 +-----------
 .../server/metadata/DynamicConfigPublisher.scala   | 103 +++++++++++++++++++++
 .../metadata/BrokerMetadataPublisherTest.scala     |  45 +++------
 7 files changed, 150 insertions(+), 113 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 0609ee7fd6c..bd05521964e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -306,7 +306,7 @@
   </subpackage>
 
   <subpackage name="queue">
-      <allow pkg="org.apache.kafka.test" />
+    <allow pkg="org.apache.kafka.test" />
   </subpackage>
 
   <subpackage name="clients">
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index d6b4fa92c3a..f55ceebffcc 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -30,7 +30,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
 import kafka.server.KafkaRaftServer.ControllerRole
-import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder}
+import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, DynamicConfigPublisher, KRaftMetadataCache, SnapshotWriterBuilder}
 import kafka.utils.{CoreUtils, KafkaScheduler}
 import org.apache.kafka.common.feature.SupportedVersionRange
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -39,7 +39,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
-import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
+import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.common.{ClusterResource, Endpoint}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
@@ -71,7 +71,7 @@ class BrokerServer(
   val initialOfflineDirs: Seq[String],
 ) extends KafkaBroker {
   val threadNamePrefix = sharedServer.threadNamePrefix
-  val config = sharedServer.config
+  val config = sharedServer.brokerConfig
   val time = sharedServer.time
   def metrics = sharedServer.metrics
 
@@ -420,8 +420,13 @@ class BrokerServer(
         config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
         DataPlaneAcceptor.ThreadPrefix)
 
-      // Block until we've caught up with the latest metadata from the controller quorum.
-      lifecycleManager.initialCatchUpFuture.get()
+      info("Waiting for broker metadata to catch up.")
+      try {
+        lifecycleManager.initialCatchUpFuture.get()
+      } catch {
+        case t: Throwable => throw new RuntimeException("Received a fatal error while " +
+          "waiting for the broker to catch up with the current cluster metadata.", t)
+      }
 
       // Apply the metadata log changes that we've accumulated.
       metadataPublisher = new BrokerMetadataPublisher(config,
@@ -431,7 +436,11 @@ class BrokerServer(
         groupCoordinator,
         transactionCoordinator,
         clientQuotaMetadataManager,
-        dynamicConfigHandlers.toMap,
+        new DynamicConfigPublisher(
+          config,
+          sharedServer.metadataPublishingFaultHandler,
+          dynamicConfigHandlers.toMap,
+        "broker"),
         authorizer,
         sharedServer.initialBrokerMetadataLoadFaultHandler,
         sharedServer.metadataPublishingFaultHandler)
@@ -567,9 +576,8 @@ class BrokerServer(
       isShuttingDown.set(false)
 
       CoreUtils.swallow(lifecycleManager.close(), this)
+      CoreUtils.swallow(config.dynamicConfig.clear(), this)
       sharedServer.stopForBroker()
-
-      CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, config.nodeId.toString, metrics), this)
       info("shut down completed")
     } catch {
       case e: Throwable =>
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index f73088b30f0..2bd518cde2a 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -59,14 +59,12 @@ class ControllerServer(
 
   import kafka.server.Server._
 
-  val config = sharedServer.config
+  val config = sharedServer.controllerConfig
   val time = sharedServer.time
   def metrics = sharedServer.metrics
   val threadNamePrefix = sharedServer.threadNamePrefix.getOrElse("")
   def raftManager: KafkaRaftManager[ApiMessageAndVersion] = sharedServer.raftManager
 
-  config.dynamicConfig.initialize(zkClientOpt = None)
-
   val lock = new ReentrantLock()
   val awaitShutdownCond = lock.newCondition()
   var status: ProcessStatus = SHUTDOWN
@@ -109,6 +107,7 @@ class ControllerServer(
     if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
     try {
       info("Starting controller")
+      config.dynamicConfig.initialize(zkClientOpt = None)
 
       maybeChangeStatus(STARTING, STARTED)
       this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()
@@ -284,6 +283,7 @@ class ControllerServer(
       createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
       alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
       socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
+      CoreUtils.swallow(config.dynamicConfig.clear(), this)
       sharedServer.stopForController()
     } catch {
       case e: Throwable =>
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index a420c9afa38..8b647e7464f 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -19,10 +19,11 @@ package kafka.server
 
 import kafka.raft.KafkaRaftManager
 import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
+import kafka.server.Server.MetricsPrefix
 import kafka.server.metadata.BrokerServerMetrics
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.controller.QuorumControllerMetrics
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -77,7 +78,7 @@ class StandardFaultHandlerFactory extends FaultHandlerFactory {
  * make debugging easier and reduce the chance of resource leaks.
  */
 class SharedServer(
-  val config: KafkaConfig,
+  private val sharedServerConfig: KafkaConfig,
   val metaProps: MetaProperties,
   val time: Time,
   private val _metrics: Metrics,
@@ -85,11 +86,13 @@ class SharedServer(
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
   val faultHandlerFactory: FaultHandlerFactory
 ) extends Logging {
-  private val logContext: LogContext = new LogContext(s"[SharedServer id=${config.nodeId}] ")
+  private val logContext: LogContext = new LogContext(s"[SharedServer id=${sharedServerConfig.nodeId}] ")
   this.logIdent = logContext.logPrefix
   private var started = false
   private var usedByBroker: Boolean = false
   private var usedByController: Boolean = false
+  val brokerConfig = new KafkaConfig(sharedServerConfig.props, false, None)
+  val controllerConfig = new KafkaConfig(sharedServerConfig.props, false, None)
   @volatile var metrics: Metrics = _metrics
   @volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
   @volatile var brokerMetrics: BrokerServerMetrics = _
@@ -143,7 +146,7 @@ class SharedServer(
    * The fault handler to use when metadata loading fails.
    */
   def metadataLoaderFaultHandler: FaultHandler = faultHandlerFactory.build("metadata loading",
-    fatal = config.processRoles.contains(ControllerRole),
+    fatal = sharedServerConfig.processRoles.contains(ControllerRole),
     action = () => SharedServer.this.synchronized {
       if (brokerMetrics != null) brokerMetrics.metadataLoadErrorCount.getAndIncrement()
       if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
@@ -188,17 +191,17 @@ class SharedServer(
           // This is only done in tests.
           metrics = new Metrics()
         }
-        config.dynamicConfig.initialize(zkClientOpt = None)
+        sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None)
 
-        if (config.processRoles.contains(BrokerRole)) {
+        if (sharedServerConfig.processRoles.contains(BrokerRole)) {
           brokerMetrics = BrokerServerMetrics(metrics)
         }
-        if (config.processRoles.contains(ControllerRole)) {
+        if (sharedServerConfig.processRoles.contains(ControllerRole)) {
           controllerMetrics = new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)
         }
         raftManager = new KafkaRaftManager[ApiMessageAndVersion](
           metaProps,
-          config,
+          sharedServerConfig,
           new MetadataRecordSerde,
           KafkaRaftServer.MetadataPartition,
           KafkaRaftServer.MetadataTopicId,
@@ -248,8 +251,7 @@ class SharedServer(
         CoreUtils.swallow(metrics.close(), this)
         metrics = null
       }
-      // Clear all reconfigurable instances stored in DynamicBrokerConfig
-      CoreUtils.swallow(config.dynamicConfig.clear(), this)
+      CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, sharedServerConfig.nodeId.toString, metrics), this)
       started = false
     }
   }
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 0192bb4afcf..933a6bf8924 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -22,11 +22,9 @@ import java.util.concurrent.atomic.AtomicLong
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{LogManager, UnifiedLog}
-import kafka.server.ConfigAdminManager.toLoggableProps
-import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig, ReplicaManager, RequestLocal}
+import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
@@ -103,7 +101,7 @@ class BrokerMetadataPublisher(
   groupCoordinator: GroupCoordinator,
   txnCoordinator: TransactionCoordinator,
   clientQuotaMetadataManager: ClientQuotaMetadataManager,
-  dynamicConfigHandlers: Map[String, ConfigHandler],
+  var dynamicConfigPublisher: DynamicConfigPublisher,
   private val _authorizer: Option[Authorizer],
   fatalFaultHandler: FaultHandler,
   metadataPublishingFaultHandler: FaultHandler
@@ -211,61 +209,10 @@ class BrokerMetadataPublisher(
       }
 
       // Apply configuration deltas.
-      Option(delta.configsDelta()).foreach { configsDelta =>
-        configsDelta.changes().keySet().forEach { resource =>
-          val props = newImage.configs().configProperties(resource)
-          resource.`type`() match {
-            case TOPIC =>
-              try {
-                // Apply changes to a topic's dynamic configuration.
-                info(s"Updating topic ${resource.name()} with new configuration : " +
-                  toLoggableProps(resource, props).mkString(","))
-                dynamicConfigHandlers(ConfigType.Topic).
-                  processConfigChanges(resource.name(), props)
-              } catch {
-                case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating topic " +
-                  s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
-                  s"in ${deltaName}", t)
-              }
-            case BROKER =>
-              if (resource.name().isEmpty) {
-                try {
-                  // Apply changes to "cluster configs" (also known as default BROKER configs).
-                  // These are stored in KRaft with an empty name field.
-                  info("Updating cluster configuration : " +
-                    toLoggableProps(resource, props).mkString(","))
-                  dynamicConfigHandlers(ConfigType.Broker).
-                    processConfigChanges(ConfigEntityName.Default, props)
-                } catch {
-                  case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating " +
-                    s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
-                    s"in ${deltaName}", t)
-                }
-              } else if (resource.name() == brokerId.toString) {
-                try {
-                  // Apply changes to this broker's dynamic configuration.
-                  info(s"Updating broker $brokerId with new configuration : " +
-                    toLoggableProps(resource, props).mkString(","))
-                  dynamicConfigHandlers(ConfigType.Broker).
-                    processConfigChanges(resource.name(), props)
-                  // When applying a per broker config (not a cluster config), we also
-                  // reload any associated file. For example, if the ssl.keystore is still
-                  // set to /tmp/foo, we still want to reload /tmp/foo in case its contents
-                  // have changed. This doesn't apply to topic configs or cluster configs.
-                  reloadUpdatedFilesWithoutConfigChange(props)
-                } catch {
-                  case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating " +
-                    s"broker with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
-                    s"in ${deltaName}", t)
-                }
-              }
-            case _ => // nothing to do
-          }
-        }
-      }
+      dynamicConfigPublisher.publish(delta, newImage)
 
+      // Apply client quotas delta.
       try {
-        // Apply client quotas delta.
         Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
           clientQuotaMetadataManager.update(clientQuotasDelta)
         }
diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
new file mode 100644
index 00000000000..12ff51d4039
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util.Properties
+import kafka.server.ConfigAdminManager.toLoggableProps
+import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig}
+import kafka.utils.Logging
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.server.fault.FaultHandler
+
+
+class DynamicConfigPublisher(
+  conf: KafkaConfig,
+  faultHandler: FaultHandler,
+  dynamicConfigHandlers: Map[String, ConfigHandler],
+  nodeType: String
+) extends Logging {
+  logIdent = s"[DynamicConfigPublisher nodeType=${nodeType} id=${conf.nodeId}] "
+
+  def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+    val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
+    try {
+      // Apply configuration deltas.
+      Option(delta.configsDelta()).foreach { configsDelta =>
+        configsDelta.changes().keySet().forEach { resource =>
+          val props = newImage.configs().configProperties(resource)
+          resource.`type`() match {
+            case TOPIC =>
+              dynamicConfigHandlers.get(ConfigType.Topic).foreach(topicConfigHandler =>
+                try {
+                  // Apply changes to a topic's dynamic configuration.
+                  info(s"Updating topic ${resource.name()} with new configuration : " +
+                    toLoggableProps(resource, props).mkString(","))
+                  topicConfigHandler.processConfigChanges(resource.name(), props)
+                } catch {
+                  case t: Throwable => faultHandler.handleFault("Error updating topic " +
+                    s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
+                    s"in ${deltaName}", t)
+                }
+              )
+            case BROKER =>
+              dynamicConfigHandlers.get(ConfigType.Broker).foreach(nodeConfigHandler =>
+                if (resource.name().isEmpty) {
+                  try {
+                    // Apply changes to "cluster configs" (also known as default BROKER configs).
+                    // These are stored in KRaft with an empty name field.
+                    info("Updating cluster configuration : " +
+                      toLoggableProps(resource, props).mkString(","))
+                    nodeConfigHandler.processConfigChanges(ConfigEntityName.Default, props)
+                  } catch {
+                    case t: Throwable => faultHandler.handleFault("Error updating " +
+                      s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
+                      s"in ${deltaName}", t)
+                  }
+                } else if (resource.name() == conf.nodeId.toString) {
+                  try {
+                    // Apply changes to this node's dynamic configuration.
+                    info(s"Updating node ${conf.nodeId} with new configuration : " +
+                      toLoggableProps(resource, props).mkString(","))
+                    nodeConfigHandler.processConfigChanges(resource.name(), props)
+                    // When applying a per node config (not a cluster config), we also
+                    // reload any associated file. For example, if the ssl.keystore is still
+                    // set to /tmp/foo, we still want to reload /tmp/foo in case its contents
+                    // have changed. This doesn't apply to topic configs or cluster configs.
+                    reloadUpdatedFilesWithoutConfigChange(props)
+                  } catch {
+                    case t: Throwable => faultHandler.handleFault("Error updating " +
+                      s"node with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
+                      s"in ${deltaName}", t)
+                  }
+                }
+              )
+            case _ => // nothing to do
+          }
+        }
+      }
+    } catch {
+      case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
+        s"publishing dynamic configuration changes from ${deltaName}", t)
+    }
+  }
+
+  def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
+    conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 8874a235a52..317da428888 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,14 +17,11 @@
 
 package kafka.server.metadata
 
-import kafka.coordinator.group.GroupCoordinator
-import kafka.coordinator.transaction.TransactionCoordinator
-
 import java.util.Collections.{singleton, singletonList, singletonMap}
 import java.util.Properties
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-import kafka.log.{LogManager, UnifiedLog}
-import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
+import kafka.log.UnifiedLog
+import kafka.server.{BrokerServer, KafkaConfig}
 import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
@@ -36,7 +33,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.PartitionRegistration
-import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
+import org.apache.kafka.server.fault.FaultHandler
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
@@ -179,29 +176,15 @@ class BrokerMetadataPublisherTest {
     new TopicsImage(idsMap.asJava, namesMap.asJava)
   }
 
-  private def newMockPublisher(
+  private def newMockDynamicConfigPublisher(
     broker: BrokerServer,
-    logManager: LogManager,
-    replicaManager: ReplicaManager,
-    groupCoordinator: GroupCoordinator,
-    txnCoordinator: TransactionCoordinator,
-    errorHandler: FaultHandler = new MockFaultHandler("publisher")
-  ): BrokerMetadataPublisher = {
-    val mockLogManager = Mockito.mock(classOf[LogManager])
-    Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog])
-    Mockito.spy(new BrokerMetadataPublisher(
+    errorHandler: FaultHandler
+  ): DynamicConfigPublisher = {
+    Mockito.spy(new DynamicConfigPublisher(
       conf = broker.config,
-      metadataCache = broker.metadataCache,
-      logManager,
-      replicaManager,
-      groupCoordinator,
-      txnCoordinator,
-      clientQuotaMetadataManager = broker.clientQuotaMetadataManager,
+      faultHandler = errorHandler,
       dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap,
-      _authorizer = Option.empty,
-      errorHandler,
-      errorHandler
-    ))
+      nodeType = "broker"))
   }
 
   @Test
@@ -215,20 +198,14 @@ class BrokerMetadataPublisherTest {
       cluster.startup()
       cluster.waitForReadyBrokers()
       val broker = cluster.brokers().values().iterator().next()
-      val mockLogManager = Mockito.mock(classOf[LogManager])
-      Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog])
-      val mockReplicaManager = Mockito.mock(classOf[ReplicaManager])
-      val mockGroupCoordinator = Mockito.mock(classOf[GroupCoordinator])
-      val mockTxnCoordinator = Mockito.mock(classOf[TransactionCoordinator])
-
-      val publisher = newMockPublisher(broker, mockLogManager, mockReplicaManager, mockGroupCoordinator, mockTxnCoordinator)
+      val publisher = newMockDynamicConfigPublisher(broker, cluster.nonFatalFaultHandler())
 
       val numTimesReloadCalled = new AtomicInteger(0)
       Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())).
         thenAnswer(new Answer[Unit]() {
           override def answer(invocation: InvocationOnMock): Unit = numTimesReloadCalled.addAndGet(1)
         })
-      broker.metadataListener.alterPublisher(publisher).get()
+      broker.metadataPublisher.dynamicConfigPublisher = publisher
       val admin = Admin.create(cluster.clientProperties())
       try {
         assertEquals(0, numTimesReloadCalled.get())