You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/08/03 18:00:08 UTC

[kafka] branch 3.3 updated: KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new a687d4d3f6 KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)
a687d4d3f6 is described below

commit a687d4d3f6874f7821996d644da6df9491bf9232
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed Aug 3 13:28:06 2022 -0400

    KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)
    
    Enable some of the dynamic broker reconfiguration tests in KRaft mode
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala |   2 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   8 +-
 core/src/main/scala/kafka/server/KafkaBroker.scala |   1 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +-
 .../main/scala/kafka/utils/PasswordEncoder.scala   |  45 ++++++--
 .../server/DynamicBrokerReconfigurationTest.scala  | 128 +++++++++++++--------
 .../unit/kafka/utils/PasswordEncoderTest.scala     |  10 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  25 ++--
 8 files changed, 145 insertions(+), 76 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 4676bfd101..9a42f9b874 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -211,7 +211,7 @@ object ConfigCommand extends Logging {
     encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
     val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
       throw new IllegalArgumentException("Password encoder secret not specified"))
-    new PasswordEncoder(new Password(encoderSecret),
+    PasswordEncoder.encrypting(new Password(encoderSecret),
       None,
       encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm),
       encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength),
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 33511147e6..76a42b74fa 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -211,7 +211,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
   private var currentConfig: KafkaConfig = null
-  private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
+  private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
+    maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
+  } else {
+    Some(PasswordEncoder.noop())
+  }
 
   private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {
     currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
@@ -340,7 +344,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
 
   private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
    secret.map { secret =>
-      new PasswordEncoder(secret,
+     PasswordEncoder.encrypting(secret,
         kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
         kafkaConfig.passwordEncoderCipherAlgorithm,
         kafkaConfig.passwordEncoderKeyLength,
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 46f2e7e8b1..b02b1167c5 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -89,6 +89,7 @@ trait KafkaBroker extends KafkaMetricsGroup {
   def shutdown(): Unit
   def brokerTopicStats: BrokerTopicStats
   def credentialProvider: CredentialProvider
+  def clientToControllerChannelManager: BrokerToControllerChannelManager
 
   // For backwards compatibility, we need to keep older metrics tied
   // to their original name when this class was named `KafkaServer`
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a9fbda6c21..4e253047ee 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1493,6 +1493,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
 
   // Cache the current config to avoid acquiring read lock to access from dynamicConfig
   @volatile private var currentConfig = this
+  val processRoles: Set[ProcessRole] = parseProcessRoles()
   private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
 
   private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
@@ -1612,7 +1613,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
   var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
   val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
-  val processRoles: Set[ProcessRole] = parseProcessRoles()
   val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
   val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
   val brokerSessionTimeoutMs: Int = getInt(KafkaConfig.BrokerSessionTimeoutMsProp)
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
index f748a455c6..3373223e36 100644
--- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala
+++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
@@ -38,6 +38,33 @@ object PasswordEncoder {
   val IterationsProp = "iterations"
   val EncyrptedPasswordProp = "encryptedPassword"
   val PasswordLengthProp = "passwordLength"
+
+  def encrypting(secret: Password,
+                 keyFactoryAlgorithm: Option[String],
+                 cipherAlgorithm: String,
+                 keyLength: Int,
+                 iterations: Int): EncryptingPasswordEncoder = {
+    new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations)
+  }
+
+  def noop(): NoOpPasswordEncoder = {
+    new NoOpPasswordEncoder()
+  }
+}
+
+trait PasswordEncoder {
+  def encode(password: Password): String
+  def decode(encodedPassword: String): Password
+
+  private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
+}
+
+/**
+ * A password encoder that does not modify the given password. This is used in KRaft mode only.
+ */
+class NoOpPasswordEncoder extends PasswordEncoder {
+  override def encode(password: Password): String = password.value()
+  override def decode(encodedPassword: String): Password = new Password(encodedPassword)
 }
 
 /**
@@ -55,16 +82,18 @@ object PasswordEncoder {
   * The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
   *
   */
-class PasswordEncoder(secret: Password,
-                      keyFactoryAlgorithm: Option[String],
-                      cipherAlgorithm: String,
-                      keyLength: Int,
-                      iterations: Int) extends Logging {
+class EncryptingPasswordEncoder(
+  secret: Password,
+  keyFactoryAlgorithm: Option[String],
+  cipherAlgorithm: String,
+  keyLength: Int,
+  iterations: Int
+) extends PasswordEncoder with Logging {
 
   private val secureRandom = new SecureRandom
   private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm)
 
-  def encode(password: Password): String = {
+  override def encode(password: Password): String = {
     val salt = new Array[Byte](256)
     secureRandom.nextBytes(salt)
     val cipher = Cipher.getInstance(cipherAlgorithm)
@@ -84,7 +113,7 @@ class PasswordEncoder(secret: Password,
     encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",")
   }
 
-  def decode(encodedPassword: String): Password = {
+  override def decode(encodedPassword: String): Password = {
     val params = CoreUtils.parseCsvMap(encodedPassword)
     val keyFactoryAlg = params(KeyFactoryAlgorithmProp)
     val cipherAlg = params(CipherAlgorithmProp)
@@ -131,8 +160,6 @@ class PasswordEncoder(secret: Password,
 
   private def base64Encode(bytes: Array[Byte]): String = Base64.getEncoder.encodeToString(bytes)
 
-  private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
-
   private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
     val aesPattern = "AES/(.*)/.*".r
     cipherAlgorithm match {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index ccfe63e7b5..c7be8ce831 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -31,7 +31,7 @@ import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
-import kafka.log.{CleanerConfig, LogConfig}
+import kafka.log.{CleanerConfig, LogConfig, UnifiedLog}
 import kafka.message.ProducerCompressionCodec
 import kafka.network.{Processor, RequestChannel}
 import kafka.utils._
@@ -64,6 +64,8 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.annotation.nowarn
 import scala.collection._
@@ -80,7 +82,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
 
   import DynamicBrokerReconfigurationTest._
 
-  private val servers = new ArrayBuffer[KafkaServer]
+  private val servers = new ArrayBuffer[KafkaBroker]
   private val numServers = 3
   private val numPartitions = 10
   private val producers = new ArrayBuffer[KafkaProducer[String, String]]
@@ -111,15 +113,22 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
 
     (0 until numServers).foreach { brokerId =>
 
-      val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
+      val props = if (isKRaftTest()) {
+        val properties = TestUtils.createBrokerConfig(brokerId, null)
+        properties.put(KafkaConfig.AdvertisedListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
+        properties
+      } else {
+        val properties = TestUtils.createBrokerConfig(brokerId, zkConnect)
+        properties.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+        properties
+      }
       props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
       // Ensure that we can support multiple listeners per security protocol and multiple security protocols
       props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
-      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL")
+      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol")
       props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
       props.put(KafkaConfig.SslClientAuthProp, "requested")
       props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
-      props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
       props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
       props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test log rolling on config update
       props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads
@@ -138,17 +147,21 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
 
       val kafkaConfig = KafkaConfig.fromProps(props)
-      configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1)
+      if (!isKRaftTest()) {
+        configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1)
+      }
 
-      servers += TestUtils.createServer(kafkaConfig)
+      servers += createBroker(kafkaConfig)
     }
 
-    TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers)
-    TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, servers.head.config.offsetsTopicPartitions,
-      replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
-
     createAdminClient(SecurityProtocol.SSL, SecureInternal)
 
+    TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, numPartitions, replicationFactor = numServers)
+    TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers,
+      numPartitions = servers.head.config.offsetsTopicPartitions,
+      replicationFactor = numServers,
+      topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
+
     TestMetricsReporter.testReporters.clear()
   }
 
@@ -166,8 +179,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     closeSasl()
   }
 
-  @Test
-  def testConfigDescribeUsingAdminClient(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConfigDescribeUsingAdminClient(quorum: String): Unit = {
 
     def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, isReadOnly: Boolean,
                      expectedProps: Properties): Unit = {
@@ -226,9 +240,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     val adminClient = adminClients.head
     alterSslKeystoreUsingConfigCommand(sslProperties1, SecureExternal)
 
-    val configDesc = describeConfig(adminClient)
-    verifySslConfig("listener.name.external.", sslProperties1, configDesc)
-    verifySslConfig("", invalidSslProperties, configDesc)
+    val configDesc = TestUtils.tryUntilNoAssertionError() {
+      val describeConfigsResult = describeConfig(adminClient)
+      verifySslConfig("listener.name.external.", sslProperties1, describeConfigsResult)
+      verifySslConfig("", invalidSslProperties, describeConfigsResult)
+      describeConfigsResult
+    }
 
     // Verify a few log configs with and without synonyms
     val expectedProps = new Properties
@@ -262,8 +279,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     assertEquals(List((KafkaConfig.LogCleanerThreadsProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads))
   }
 
-  @Test
-  def testUpdatesUsingConfigProvider(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUpdatesUsingConfigProvider(quorum: String): Unit = {
     val PollingIntervalVal = f"$${file:polling.interval:interval}"
     val PollingIntervalUpdateVal = f"$${file:polling.interval:updinterval}"
     val SslTruststoreTypeVal = f"$${file:ssl.truststore.type:storetype}"
@@ -309,11 +327,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       assertFalse(reporter.kafkaMetrics.isEmpty, "No metrics found")
     }
 
-    // fetch from ZK, values should be unresolved
-    val props = fetchBrokerConfigsFromZooKeeper(servers.head)
-    assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK")
-    assertTrue(props.getProperty(configPrefix+KafkaConfig.SslTruststoreTypeProp) == SslTruststoreTypeVal, "store type is not updated in ZK")
-    assertTrue(props.getProperty(configPrefix+KafkaConfig.SslKeystorePasswordProp) == SslKeystorePasswordVal, "keystore password is not updated in ZK")
+    if (!isKRaftTest()) {
+      // fetch from ZK, values should be unresolved
+      val props = fetchBrokerConfigsFromZooKeeper(servers.head)
+      assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK")
+      assertTrue(props.getProperty(configPrefix + KafkaConfig.SslTruststoreTypeProp) == SslTruststoreTypeVal, "store type is not updated in ZK")
+      assertTrue(props.getProperty(configPrefix + KafkaConfig.SslKeystorePasswordProp) == SslKeystorePasswordVal, "keystore password is not updated in ZK")
+    }
 
     // verify the update
     // 1. verify update not occurring if the value of property is same.
@@ -332,10 +352,10 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     }
   }
 
-  @Test
+  @Test // TODO KAFKA-14126 add KRaft support
   def testKeyStoreAlter(): Unit = {
     val topic2 = "testtopic2"
-    TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers, servers)
+    TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
 
     // Start a producer and consumer that work with the current broker keystore.
     // This should continue working while changes are made
@@ -399,7 +419,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
-  @Test
+  @Test // TODO KAFKA-14126 add KRaft support
   def testTrustStoreAlter(): Unit = {
     val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)
 
@@ -481,7 +501,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     verifySslProduceConsume(sslProperties2, "alter-truststore-7")
     waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
 
-    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
     val controllerChannelManager = controller.kafkaController.controllerChannelManager
     val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
       JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
@@ -492,8 +512,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     verifyBrokerToControllerCall(controller)
   }
 
-  @Test
-  def testLogCleanerConfig(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLogCleanerConfig(quorum: String): Unit = {
     val (producerThread, consumerThread) = startProduceConsume(retries = 0)
 
     verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
@@ -537,13 +558,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
-  @Test
-  def testConsecutiveConfigChange(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConsecutiveConfigChange(quorum: String): Unit = {
     val topic2 = "testtopic2"
     val topicProps = new Properties
     topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2")
-    TestUtils.createTopic(zkClient, topic2, 1, replicationFactor = numServers, servers, topicProps)
-    var log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+    TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps)
+
+    def getLogOrThrow(tp: TopicPartition): UnifiedLog = {
+      var (logOpt, found) = TestUtils.computeUntilTrue {
+        servers.head.logManager.getLog(tp)
+      }(_.isDefined)
+      assertTrue(found, "Log not found")
+      logOpt.get
+    }
+
+    var log = getLogOrThrow(new TopicPartition(topic2, 0))
     assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
     assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString)
 
@@ -558,7 +589,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       }
     }
 
-    log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+    log = getLogOrThrow(new TopicPartition(topic2, 0))
     assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
     assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config survives
 
@@ -566,7 +597,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     props.clear()
     props.put(KafkaConfig.LogRetentionTimeMillisProp, "604800000")
     reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogRetentionTimeMillisProp, "604800000"))
-    log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+    log = getLogOrThrow(new TopicPartition(topic2, 0))
     assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
     assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config still survives
   }
@@ -974,6 +1005,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
   }
 
   @Test
+  // Modifying advertised listeners is not supported in KRaft
   def testAdvertisedListenerUpdate(): Unit = {
     val adminClient = adminClients.head
     val externalAdminClient = createAdminClient(SecurityProtocol.SASL_SSL, SecureExternal)
@@ -994,11 +1026,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     }
 
     // Verify that endpoints have been updated in ZK for all brokers
-    servers.foreach(validateEndpointsInZooKeeper(_, endpoints => endpoints.contains(invalidHost)))
+    servers.foreach { server =>
+      validateEndpointsInZooKeeper(server.asInstanceOf[KafkaServer], endpoints => endpoints.contains(invalidHost))
+    }
 
     // Trigger session expiry and ensure that controller registers new advertised listener after expiry
     val controllerEpoch = zkClient.getControllerEpoch
-    val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller")))
+    val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller"))).asInstanceOf[KafkaServer]
     val controllerZkClient = controllerServer.zkClient
     val sessionExpiringClient = createZooKeeperClientToTriggerSessionExpiry(controllerZkClient.currentZooKeeper)
     sessionExpiringClient.close()
@@ -1022,7 +1056,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       .getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException])
 
     alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
-    servers.foreach(validateEndpointsInZooKeeper(_, endpoints => !endpoints.contains(invalidHost)))
+    servers.foreach { server =>
+      validateEndpointsInZooKeeper(server.asInstanceOf[KafkaServer], endpoints => !endpoints.contains(invalidHost))
+    }
 
     // Verify that produce/consume work now
     val topic2 = "testtopic2"
@@ -1119,7 +1155,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     assertTrue(partitions.exists(_.leader == null), "Did not find partitions with no leader")
   }
 
-  private def addListener(servers: Seq[KafkaServer], listenerName: String, securityProtocol: SecurityProtocol,
+  private def addListener(servers: Seq[KafkaBroker], listenerName: String, securityProtocol: SecurityProtocol,
                           saslMechanisms: Seq[String]): Unit = {
     val config = servers.head.config
     val existingListenerCount = config.listeners.size
@@ -1264,11 +1300,11 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
   }
 
-  private def hasListenerMetric(server: KafkaServer, listenerName: String): Boolean = {
+  private def hasListenerMetric(server: KafkaBroker, listenerName: String): Boolean = {
     server.socketServer.metrics.metrics.keySet.asScala.exists(_.tags.get("listener") == listenerName)
   }
 
-  private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = {
+  private def fetchBrokerConfigsFromZooKeeper(server: KafkaBroker): Properties = {
     val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
     server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true)
   }
@@ -1322,7 +1358,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     }, "Did not fail authentication with invalid config")
   }
 
-  private def describeConfig(adminClient: Admin, servers: Seq[KafkaServer] = this.servers): Config = {
+  private def describeConfig(adminClient: Admin, servers: Seq[KafkaBroker] = this.servers): Config = {
     val configResources = servers.map { server =>
       new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
     }
@@ -1419,7 +1455,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
   }
 
   @nowarn("cat=deprecation")
-  private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = {
+  private def alterConfigsOnServer(server: KafkaBroker, props: Properties): Unit = {
     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
     val configs = Map(new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) -> newConfig).asJava
@@ -1428,7 +1464,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
   }
 
   @nowarn("cat=deprecation")
-  private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties,
+  private def alterConfigs(servers: Seq[KafkaBroker], adminClient: Admin, props: Properties,
                    perBrokerConfig: Boolean): AlterConfigsResult = {
     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
@@ -1507,7 +1543,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
 
   private def createPasswordEncoder(config: KafkaConfig, secret: Option[Password]): PasswordEncoder = {
     val encoderSecret = secret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
-    new PasswordEncoder(encoderSecret,
+    PasswordEncoder.encrypting(encoderSecret,
       config.passwordEncoderKeyFactoryAlgorithm,
       config.passwordEncoderCipherAlgorithm,
       config.passwordEncoderKeyLength,
@@ -1518,7 +1554,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     servers.foreach { server => waitForConfigOnServer(server, propName, propValue, maxWaitMs) }
   }
 
-  private def waitForConfigOnServer(server: KafkaServer, propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
+  private def waitForConfigOnServer(server: KafkaBroker, propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
     TestUtils.retry(maxWaitMs) {
       assertEquals(propValue, server.config.originals.get(propName))
     }
diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
index 0a5d5ac029..50cdceabbc 100755
--- a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
@@ -30,7 +30,7 @@ class PasswordEncoderTest {
 
   @Test
   def testEncodeDecode(): Unit = {
-    val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+    val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
       None,
       Defaults.PasswordEncoderCipherAlgorithm,
       Defaults.PasswordEncoderKeyLength,
@@ -54,7 +54,7 @@ class PasswordEncoderTest {
 
   @Test
   def testEncoderConfigChange(): Unit = {
-    val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+    val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
       Some("PBKDF2WithHmacSHA1"),
       "DES/CBC/PKCS5Padding",
       64,
@@ -68,7 +68,7 @@ class PasswordEncoderTest {
     assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
 
     // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
-    val decoder = new PasswordEncoder(new Password("password-encoder-secret"),
+    val decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
       Some("PBKDF2WithHmacSHA1"),
       "AES/CBC/PKCS5Padding",
       128,
@@ -76,7 +76,7 @@ class PasswordEncoderTest {
     assertEquals(password, decoder.decode(encoded).value)
 
     // Test that decoding fails if secret is altered
-    val decoder2 = new PasswordEncoder(new Password("secret-2"),
+    val decoder2 = PasswordEncoder.encrypting(new Password("secret-2"),
       Some("PBKDF2WithHmacSHA1"),
       "AES/CBC/PKCS5Padding",
       128,
@@ -92,7 +92,7 @@ class PasswordEncoderTest {
   def testEncodeDecodeAlgorithms(): Unit = {
 
     def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = {
-      val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+      val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
         keyFactoryAlg,
         cipherAlg,
         keyLength,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c49a7bdde0..d0266bdee9 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -511,7 +511,7 @@ object TestUtils extends Logging {
                   topic: String,
                   numPartitions: Int = 1,
                   replicationFactor: Int = 1,
-                  servers: Seq[KafkaServer],
+                  servers: Seq[KafkaBroker],
                   topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
@@ -543,7 +543,7 @@ object TestUtils extends Logging {
   def createTopic(zkClient: KafkaZkClient,
                   topic: String,
                   partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
-                  servers: Seq[KafkaServer]): scala.collection.immutable.Map[Int, Int] = {
+                  servers: Seq[KafkaBroker]): scala.collection.immutable.Map[Int, Int] = {
     createTopic(zkClient, topic, partitionReplicaAssignment, servers, new Properties())
   }
 
@@ -555,7 +555,7 @@ object TestUtils extends Logging {
   def createTopic(zkClient: KafkaZkClient,
                   topic: String,
                   partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
-                  servers: Seq[KafkaServer],
+                  servers: Seq[KafkaBroker],
                   topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
@@ -583,7 +583,7 @@ object TestUtils extends Logging {
     * Create the consumer offsets/group metadata topic and wait until the leader is elected and metadata is propagated
     * to all brokers.
     */
-  def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaServer]): Unit = {
+  def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
     val server = servers.head
     createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
       server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
@@ -1043,18 +1043,19 @@ object TestUtils extends Logging {
    * otherwise difficult to poll for. `computeUntilTrue` and `waitUntilTrue` should be preferred in cases where we can
    * easily wait on a condition before evaluating the assertions.
    */
-  def tryUntilNoAssertionError(waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(assertions: => Unit) = {
-    val (error, success) = TestUtils.computeUntilTrue({
+  def tryUntilNoAssertionError[T](waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(assertions: => T): T = {
+    val (either, success) = TestUtils.computeUntilTrue({
       try {
-        assertions
-        None
+        val res = assertions
+        Left(res)
       } catch {
-        case ae: AssertionError => Some(ae)
+        case ae: AssertionError => Right(ae)
       }
-    }, waitTime = waitTime, pause = pause)(_.isEmpty)
+    }, waitTime = waitTime, pause = pause)(_.isLeft)
 
-    if (!success) {
-      throw error.get
+    either match {
+      case Left(res) => res
+      case Right(err) => throw err
     }
   }