You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/12/07 18:43:49 UTC
[kafka] branch trunk updated: MINOR: Move dynamic config logic to DynamicConfigPublisher (#12958)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 100e8746713 MINOR: Move dynamic config logic to DynamicConfigPublisher (#12958)
100e8746713 is described below
commit 100e87467135514170bdd4a0614f9b1c37c04c48
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Wed Dec 7 10:43:34 2022 -0800
MINOR: Move dynamic config logic to DynamicConfigPublisher (#12958)
Split out the logic for applying dynamic configurations to a KafkaConfig object from
BrokerMetadataPublisher into a new class, DynamicConfigPublisher. This will allow the
ControllerServer to also run this code, in a follow-up change.
Create separate KafkaConfig objects in BrokerServer versus ControllerServer. This is necessary
because the controller will apply configuration changes as soon as its raft client catches up to
the high water mark, whereas the broker will wait for the active controller to acknowledge it has
caught up in a heartbeat response. So when running in combined mode, we want two separate
KafkaConfig objects that are changed at different times.
Minor changes: improve the error message when catching up broker metadata fails. Fix incorrect
indentation in checkstyle/import-control.xml. Invoke AppInfoParser.unregisterAppInfo from
SharedServer.stop so that it happens only when both the controller and broker have shut down.
Reviewers: David Arthur <mu...@gmail.com>
---
checkstyle/import-control.xml | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 24 +++--
.../main/scala/kafka/server/ControllerServer.scala | 6 +-
.../src/main/scala/kafka/server/SharedServer.scala | 22 +++--
.../server/metadata/BrokerMetadataPublisher.scala | 61 +-----------
.../server/metadata/DynamicConfigPublisher.scala | 103 +++++++++++++++++++++
.../metadata/BrokerMetadataPublisherTest.scala | 45 +++------
7 files changed, 150 insertions(+), 113 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 0609ee7fd6c..bd05521964e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -306,7 +306,7 @@
</subpackage>
<subpackage name="queue">
- <allow pkg="org.apache.kafka.test" />
+ <allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="clients">
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index d6b4fa92c3a..f55ceebffcc 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -30,7 +30,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.server.KafkaRaftServer.ControllerRole
-import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder}
+import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, DynamicConfigPublisher, KRaftMetadataCache, SnapshotWriterBuilder}
import kafka.utils.{CoreUtils, KafkaScheduler}
import org.apache.kafka.common.feature.SupportedVersionRange
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -39,7 +39,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
-import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
+import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.{BrokerState, VersionRange}
@@ -71,7 +71,7 @@ class BrokerServer(
val initialOfflineDirs: Seq[String],
) extends KafkaBroker {
val threadNamePrefix = sharedServer.threadNamePrefix
- val config = sharedServer.config
+ val config = sharedServer.brokerConfig
val time = sharedServer.time
def metrics = sharedServer.metrics
@@ -420,8 +420,13 @@ class BrokerServer(
config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix)
- // Block until we've caught up with the latest metadata from the controller quorum.
- lifecycleManager.initialCatchUpFuture.get()
+ info("Waiting for broker metadata to catch up.")
+ try {
+ lifecycleManager.initialCatchUpFuture.get()
+ } catch {
+ case t: Throwable => throw new RuntimeException("Received a fatal error while " +
+ "waiting for the broker to catch up with the current cluster metadata.", t)
+ }
// Apply the metadata log changes that we've accumulated.
metadataPublisher = new BrokerMetadataPublisher(config,
@@ -431,7 +436,11 @@ class BrokerServer(
groupCoordinator,
transactionCoordinator,
clientQuotaMetadataManager,
- dynamicConfigHandlers.toMap,
+ new DynamicConfigPublisher(
+ config,
+ sharedServer.metadataPublishingFaultHandler,
+ dynamicConfigHandlers.toMap,
+ "broker"),
authorizer,
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler)
@@ -567,9 +576,8 @@ class BrokerServer(
isShuttingDown.set(false)
CoreUtils.swallow(lifecycleManager.close(), this)
+ CoreUtils.swallow(config.dynamicConfig.clear(), this)
sharedServer.stopForBroker()
-
- CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, config.nodeId.toString, metrics), this)
info("shut down completed")
} catch {
case e: Throwable =>
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index f73088b30f0..2bd518cde2a 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -59,14 +59,12 @@ class ControllerServer(
import kafka.server.Server._
- val config = sharedServer.config
+ val config = sharedServer.controllerConfig
val time = sharedServer.time
def metrics = sharedServer.metrics
val threadNamePrefix = sharedServer.threadNamePrefix.getOrElse("")
def raftManager: KafkaRaftManager[ApiMessageAndVersion] = sharedServer.raftManager
- config.dynamicConfig.initialize(zkClientOpt = None)
-
val lock = new ReentrantLock()
val awaitShutdownCond = lock.newCondition()
var status: ProcessStatus = SHUTDOWN
@@ -109,6 +107,7 @@ class ControllerServer(
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
try {
info("Starting controller")
+ config.dynamicConfig.initialize(zkClientOpt = None)
maybeChangeStatus(STARTING, STARTED)
this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()
@@ -284,6 +283,7 @@ class ControllerServer(
createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this))
socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
+ CoreUtils.swallow(config.dynamicConfig.clear(), this)
sharedServer.stopForController()
} catch {
case e: Throwable =>
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index a420c9afa38..8b647e7464f 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -19,10 +19,11 @@ package kafka.server
import kafka.raft.KafkaRaftManager
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
+import kafka.server.Server.MetricsPrefix
import kafka.server.metadata.BrokerServerMetrics
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.controller.QuorumControllerMetrics
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -77,7 +78,7 @@ class StandardFaultHandlerFactory extends FaultHandlerFactory {
* make debugging easier and reduce the chance of resource leaks.
*/
class SharedServer(
- val config: KafkaConfig,
+ private val sharedServerConfig: KafkaConfig,
val metaProps: MetaProperties,
val time: Time,
private val _metrics: Metrics,
@@ -85,11 +86,13 @@ class SharedServer(
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
val faultHandlerFactory: FaultHandlerFactory
) extends Logging {
- private val logContext: LogContext = new LogContext(s"[SharedServer id=${config.nodeId}] ")
+ private val logContext: LogContext = new LogContext(s"[SharedServer id=${sharedServerConfig.nodeId}] ")
this.logIdent = logContext.logPrefix
private var started = false
private var usedByBroker: Boolean = false
private var usedByController: Boolean = false
+ val brokerConfig = new KafkaConfig(sharedServerConfig.props, false, None)
+ val controllerConfig = new KafkaConfig(sharedServerConfig.props, false, None)
@volatile var metrics: Metrics = _metrics
@volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerMetrics: BrokerServerMetrics = _
@@ -143,7 +146,7 @@ class SharedServer(
* The fault handler to use when metadata loading fails.
*/
def metadataLoaderFaultHandler: FaultHandler = faultHandlerFactory.build("metadata loading",
- fatal = config.processRoles.contains(ControllerRole),
+ fatal = sharedServerConfig.processRoles.contains(ControllerRole),
action = () => SharedServer.this.synchronized {
if (brokerMetrics != null) brokerMetrics.metadataLoadErrorCount.getAndIncrement()
if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
@@ -188,17 +191,17 @@ class SharedServer(
// This is only done in tests.
metrics = new Metrics()
}
- config.dynamicConfig.initialize(zkClientOpt = None)
+ sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None)
- if (config.processRoles.contains(BrokerRole)) {
+ if (sharedServerConfig.processRoles.contains(BrokerRole)) {
brokerMetrics = BrokerServerMetrics(metrics)
}
- if (config.processRoles.contains(ControllerRole)) {
+ if (sharedServerConfig.processRoles.contains(ControllerRole)) {
controllerMetrics = new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)
}
raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaProps,
- config,
+ sharedServerConfig,
new MetadataRecordSerde,
KafkaRaftServer.MetadataPartition,
KafkaRaftServer.MetadataTopicId,
@@ -248,8 +251,7 @@ class SharedServer(
CoreUtils.swallow(metrics.close(), this)
metrics = null
}
- // Clear all reconfigurable instances stored in DynamicBrokerConfig
- CoreUtils.swallow(config.dynamicConfig.clear(), this)
+ CoreUtils.swallow(AppInfoParser.unregisterAppInfo(MetricsPrefix, sharedServerConfig.nodeId.toString, metrics), this)
started = false
}
}
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 0192bb4afcf..933a6bf8924 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -22,11 +22,9 @@ import java.util.concurrent.atomic.AtomicLong
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogManager, UnifiedLog}
-import kafka.server.ConfigAdminManager.toLoggableProps
-import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig, ReplicaManager, RequestLocal}
+import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
@@ -103,7 +101,7 @@ class BrokerMetadataPublisher(
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
clientQuotaMetadataManager: ClientQuotaMetadataManager,
- dynamicConfigHandlers: Map[String, ConfigHandler],
+ var dynamicConfigPublisher: DynamicConfigPublisher,
private val _authorizer: Option[Authorizer],
fatalFaultHandler: FaultHandler,
metadataPublishingFaultHandler: FaultHandler
@@ -211,61 +209,10 @@ class BrokerMetadataPublisher(
}
// Apply configuration deltas.
- Option(delta.configsDelta()).foreach { configsDelta =>
- configsDelta.changes().keySet().forEach { resource =>
- val props = newImage.configs().configProperties(resource)
- resource.`type`() match {
- case TOPIC =>
- try {
- // Apply changes to a topic's dynamic configuration.
- info(s"Updating topic ${resource.name()} with new configuration : " +
- toLoggableProps(resource, props).mkString(","))
- dynamicConfigHandlers(ConfigType.Topic).
- processConfigChanges(resource.name(), props)
- } catch {
- case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating topic " +
- s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
- s"in ${deltaName}", t)
- }
- case BROKER =>
- if (resource.name().isEmpty) {
- try {
- // Apply changes to "cluster configs" (also known as default BROKER configs).
- // These are stored in KRaft with an empty name field.
- info("Updating cluster configuration : " +
- toLoggableProps(resource, props).mkString(","))
- dynamicConfigHandlers(ConfigType.Broker).
- processConfigChanges(ConfigEntityName.Default, props)
- } catch {
- case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating " +
- s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
- s"in ${deltaName}", t)
- }
- } else if (resource.name() == brokerId.toString) {
- try {
- // Apply changes to this broker's dynamic configuration.
- info(s"Updating broker $brokerId with new configuration : " +
- toLoggableProps(resource, props).mkString(","))
- dynamicConfigHandlers(ConfigType.Broker).
- processConfigChanges(resource.name(), props)
- // When applying a per broker config (not a cluster config), we also
- // reload any associated file. For example, if the ssl.keystore is still
- // set to /tmp/foo, we still want to reload /tmp/foo in case its contents
- // have changed. This doesn't apply to topic configs or cluster configs.
- reloadUpdatedFilesWithoutConfigChange(props)
- } catch {
- case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating " +
- s"broker with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
- s"in ${deltaName}", t)
- }
- }
- case _ => // nothing to do
- }
- }
- }
+ dynamicConfigPublisher.publish(delta, newImage)
+ // Apply client quotas delta.
try {
- // Apply client quotas delta.
Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
clientQuotaMetadataManager.update(clientQuotasDelta)
}
diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
new file mode 100644
index 00000000000..12ff51d4039
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util.Properties
+import kafka.server.ConfigAdminManager.toLoggableProps
+import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig}
+import kafka.utils.Logging
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.server.fault.FaultHandler
+
+
+class DynamicConfigPublisher(
+ conf: KafkaConfig,
+ faultHandler: FaultHandler,
+ dynamicConfigHandlers: Map[String, ConfigHandler],
+ nodeType: String
+) extends Logging {
+ logIdent = s"[DynamicConfigPublisher nodeType=${nodeType} id=${conf.nodeId}] "
+
+ def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+ val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
+ try {
+ // Apply configuration deltas.
+ Option(delta.configsDelta()).foreach { configsDelta =>
+ configsDelta.changes().keySet().forEach { resource =>
+ val props = newImage.configs().configProperties(resource)
+ resource.`type`() match {
+ case TOPIC =>
+ dynamicConfigHandlers.get(ConfigType.Topic).foreach(topicConfigHandler =>
+ try {
+ // Apply changes to a topic's dynamic configuration.
+ info(s"Updating topic ${resource.name()} with new configuration : " +
+ toLoggableProps(resource, props).mkString(","))
+ topicConfigHandler.processConfigChanges(resource.name(), props)
+ } catch {
+ case t: Throwable => faultHandler.handleFault("Error updating topic " +
+ s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
+ s"in ${deltaName}", t)
+ }
+ )
+ case BROKER =>
+ dynamicConfigHandlers.get(ConfigType.Broker).foreach(nodeConfigHandler =>
+ if (resource.name().isEmpty) {
+ try {
+ // Apply changes to "cluster configs" (also known as default BROKER configs).
+ // These are stored in KRaft with an empty name field.
+ info("Updating cluster configuration : " +
+ toLoggableProps(resource, props).mkString(","))
+ nodeConfigHandler.processConfigChanges(ConfigEntityName.Default, props)
+ } catch {
+ case t: Throwable => faultHandler.handleFault("Error updating " +
+ s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
+ s"in ${deltaName}", t)
+ }
+ } else if (resource.name() == conf.nodeId.toString) {
+ try {
+ // Apply changes to this node's dynamic configuration.
+ info(s"Updating node ${conf.nodeId} with new configuration : " +
+ toLoggableProps(resource, props).mkString(","))
+ nodeConfigHandler.processConfigChanges(resource.name(), props)
+ // When applying a per node config (not a cluster config), we also
+ // reload any associated file. For example, if the ssl.keystore is still
+ // set to /tmp/foo, we still want to reload /tmp/foo in case its contents
+ // have changed. This doesn't apply to topic configs or cluster configs.
+ reloadUpdatedFilesWithoutConfigChange(props)
+ } catch {
+ case t: Throwable => faultHandler.handleFault("Error updating " +
+ s"node with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
+ s"in ${deltaName}", t)
+ }
+ }
+ )
+ case _ => // nothing to do
+ }
+ }
+ }
+ } catch {
+ case t: Throwable => faultHandler.handleFault("Uncaught exception while " +
+ s"publishing dynamic configuration changes from ${deltaName}", t)
+ }
+ }
+
+ def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
+ conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 8874a235a52..317da428888 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,14 +17,11 @@
package kafka.server.metadata
-import kafka.coordinator.group.GroupCoordinator
-import kafka.coordinator.transaction.TransactionCoordinator
-
import java.util.Collections.{singleton, singletonList, singletonMap}
import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-import kafka.log.{LogManager, UnifiedLog}
-import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
+import kafka.log.UnifiedLog
+import kafka.server.{BrokerServer, KafkaConfig}
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
@@ -36,7 +33,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
-import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
+import org.apache.kafka.server.fault.FaultHandler
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
@@ -179,29 +176,15 @@ class BrokerMetadataPublisherTest {
new TopicsImage(idsMap.asJava, namesMap.asJava)
}
- private def newMockPublisher(
+ private def newMockDynamicConfigPublisher(
broker: BrokerServer,
- logManager: LogManager,
- replicaManager: ReplicaManager,
- groupCoordinator: GroupCoordinator,
- txnCoordinator: TransactionCoordinator,
- errorHandler: FaultHandler = new MockFaultHandler("publisher")
- ): BrokerMetadataPublisher = {
- val mockLogManager = Mockito.mock(classOf[LogManager])
- Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog])
- Mockito.spy(new BrokerMetadataPublisher(
+ errorHandler: FaultHandler
+ ): DynamicConfigPublisher = {
+ Mockito.spy(new DynamicConfigPublisher(
conf = broker.config,
- metadataCache = broker.metadataCache,
- logManager,
- replicaManager,
- groupCoordinator,
- txnCoordinator,
- clientQuotaMetadataManager = broker.clientQuotaMetadataManager,
+ faultHandler = errorHandler,
dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap,
- _authorizer = Option.empty,
- errorHandler,
- errorHandler
- ))
+ nodeType = "broker"))
}
@Test
@@ -215,20 +198,14 @@ class BrokerMetadataPublisherTest {
cluster.startup()
cluster.waitForReadyBrokers()
val broker = cluster.brokers().values().iterator().next()
- val mockLogManager = Mockito.mock(classOf[LogManager])
- Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog])
- val mockReplicaManager = Mockito.mock(classOf[ReplicaManager])
- val mockGroupCoordinator = Mockito.mock(classOf[GroupCoordinator])
- val mockTxnCoordinator = Mockito.mock(classOf[TransactionCoordinator])
-
- val publisher = newMockPublisher(broker, mockLogManager, mockReplicaManager, mockGroupCoordinator, mockTxnCoordinator)
+ val publisher = newMockDynamicConfigPublisher(broker, cluster.nonFatalFaultHandler())
val numTimesReloadCalled = new AtomicInteger(0)
Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())).
thenAnswer(new Answer[Unit]() {
override def answer(invocation: InvocationOnMock): Unit = numTimesReloadCalled.addAndGet(1)
})
- broker.metadataListener.alterPublisher(publisher).get()
+ broker.metadataPublisher.dynamicConfigPublisher = publisher
val admin = Admin.create(cluster.clientProperties())
try {
assertEquals(0, numTimesReloadCalled.get())