You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/08/04 20:31:19 UTC

[kafka] branch 3.2 updated (89b2bf257b -> a7369bd52f)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a change to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from 89b2bf257b MINOR: Update 3.2 branch to 3.2.2-SNAPSHOT
     new 4e049c706f KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)
     new a7369bd52f KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/scala/kafka/admin/ConfigCommand.scala |   2 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   8 +-
 core/src/main/scala/kafka/server/KafkaBroker.scala |   1 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +-
 .../main/scala/kafka/utils/PasswordEncoder.scala   |  45 ++++--
 .../server/DynamicBrokerReconfigurationTest.scala  | 174 +++++++++++++--------
 .../unit/kafka/utils/PasswordEncoderTest.scala     |  10 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  25 +--
 .../controller/ConfigurationControlManager.java    |  13 +-
 9 files changed, 185 insertions(+), 95 deletions(-)


[kafka] 02/02: KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a7369bd52ff1e91b3a56d5622ce49c7c515cb81e
Author: David Arthur <mu...@gmail.com>
AuthorDate: Thu Aug 4 15:09:08 2022 -0400

    KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)
---
 .../server/DynamicBrokerReconfigurationTest.scala  | 54 ++++++++++++++--------
 .../controller/ConfigurationControlManager.java    | 13 ++++--
 2 files changed, 44 insertions(+), 23 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c3d1c68c71..a76fa91381 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -67,6 +67,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.concurrent.atomic.AtomicInteger
 import scala.annotation.nowarn
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
@@ -352,8 +353,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     }
   }
 
-  @Test // TODO KAFKA-14126 add KRaft support
-  def testKeyStoreAlter(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testKeyStoreAlter(quorum: String): Unit = {
     val topic2 = "testtopic2"
     TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
 
@@ -419,8 +421,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
-  @Test // TODO KAFKA-14126 add KRaft support
-  def testTrustStoreAlter(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTrustStoreAlter(quorum: String): Unit = {
     val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)
 
     // Producer with new keystore should fail to connect before truststore update
@@ -467,9 +470,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       assertFalse(response.wasDisconnected(), "Request failed because broker is not available")
     }
 
+    val group_id = new AtomicInteger(1)
+    def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}"
+
     // Produce/consume should work with old as well as new client keystore
-    verifySslProduceConsume(sslProperties1, "alter-truststore-1")
-    verifySslProduceConsume(sslProperties2, "alter-truststore-2")
+    verifySslProduceConsume(sslProperties1, next_group_name())
+    verifySslProduceConsume(sslProperties2, next_group_name())
 
     // Revert to old truststore with only one certificate and update. Clients should connect only with old keystore.
     val oldTruststoreProps = new Properties
@@ -478,7 +484,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     reconfigureServers(oldTruststoreProps, perBrokerConfig = true,
       (s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
     verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build())
-    verifySslProduceConsume(sslProperties1, "alter-truststore-3")
+    verifySslProduceConsume(sslProperties1, next_group_name())
 
     // Update same truststore file to contain both certificates without changing any configs.
     // Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes.
@@ -486,8 +492,14 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
       StandardCopyOption.REPLACE_EXISTING)
     TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
-    verifySslProduceConsume(sslProperties1, "alter-truststore-4")
-    verifySslProduceConsume(sslProperties2, "alter-truststore-5")
+    TestUtils.retry(30000) {
+      try {
+        verifySslProduceConsume(sslProperties1, next_group_name())
+        verifySslProduceConsume(sslProperties2, next_group_name())
+      } catch {
+        case t: Throwable => throw new AssertionError(t)
+      }
+    }
 
     // Update internal keystore/truststore and validate new client connections from broker (e.g. controller).
     // Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection
@@ -495,21 +507,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix)
     props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix)
     TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
-    verifySslProduceConsume(sslProperties2, "alter-truststore-6")
+    verifySslProduceConsume(sslProperties2, next_group_name())
     props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix)
     TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
-    verifySslProduceConsume(sslProperties2, "alter-truststore-7")
+    verifySslProduceConsume(sslProperties2, next_group_name())
     waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
 
-    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
-    val controllerChannelManager = controller.kafkaController.controllerChannelManager
-    val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
-      JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
-    brokerStateInfo(0).networkClient.disconnect("0")
-    TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
-
-    // validate that the brokerToController request works fine
-    verifyBrokerToControllerCall(controller)
+    if (!isKRaftTest()) {
+      val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
+      val controllerChannelManager = controller.kafkaController.controllerChannelManager
+      val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
+        JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
+      brokerStateInfo(0).networkClient.disconnect("0")
+      TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
+
+      // validate that the brokerToController request works fine
+      verifyBrokerToControllerCall(controller)
+    }
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index a16361343b..746fdf1ffe 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource.Type;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
@@ -146,7 +147,8 @@ public class ConfigurationControlManager {
                     }
                     break;
             }
-            if (!Objects.equals(currentValue, newValue)) {
+            if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
+                // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
                 newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
                     setResourceType(configResource.type().id()).
                     setResourceName(configResource.name()).
@@ -233,7 +235,8 @@ public class ConfigurationControlManager {
             String key = entry.getKey();
             String newValue = entry.getValue();
             String currentValue = currentConfigs.get(key);
-            if (!Objects.equals(newValue, currentValue)) {
+            if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
+                // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
                 newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
                     setResourceType(configResource.type().id()).
                     setResourceName(configResource.name()).
@@ -297,7 +300,11 @@ public class ConfigurationControlManager {
         if (configs.isEmpty()) {
             configData.remove(configResource);
         }
-        log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
+        if (configSchema.isSensitive(record)) {
+            log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN);
+        } else {
+            log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
+        }
     }
 
     // VisibleForTesting


[kafka] 01/02: KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4e049c706fa741164bc5ad65768236a48f75e64d
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed Aug 3 13:28:06 2022 -0400

    KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)
    
    Enable some of the dynamic broker reconfiguration tests in KRaft mode
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala |   2 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   8 +-
 core/src/main/scala/kafka/server/KafkaBroker.scala |   1 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +-
 .../main/scala/kafka/utils/PasswordEncoder.scala   |  45 ++++++--
 .../server/DynamicBrokerReconfigurationTest.scala  | 128 +++++++++++++--------
 .../unit/kafka/utils/PasswordEncoderTest.scala     |  10 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  25 ++--
 8 files changed, 145 insertions(+), 76 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 5e5ccefa45..126be3a81a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -212,7 +212,7 @@ object ConfigCommand extends Config {
     encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
     val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
       throw new IllegalArgumentException("Password encoder secret not specified"))
-    new PasswordEncoder(new Password(encoderSecret),
+    PasswordEncoder.encrypting(new Password(encoderSecret),
       None,
       encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm),
       encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength),
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 2a4fd9501b..a40444507b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -209,7 +209,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
   private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
   private var currentConfig: KafkaConfig = null
-  private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
+  private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
+    maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
+  } else {
+    Some(PasswordEncoder.noop())
+  }
 
   private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {
     currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
@@ -338,7 +342,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
 
   private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
    secret.map { secret =>
-      new PasswordEncoder(secret,
+     PasswordEncoder.encrypting(secret,
         kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
         kafkaConfig.passwordEncoderCipherAlgorithm,
         kafkaConfig.passwordEncoderKeyLength,
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index f4c6abc306..d4210b55f4 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -88,6 +88,7 @@ trait KafkaBroker extends KafkaMetricsGroup {
   def shutdown(): Unit
   def brokerTopicStats: BrokerTopicStats
   def credentialProvider: CredentialProvider
+  def clientToControllerChannelManager: BrokerToControllerChannelManager
 
   // For backwards compatibility, we need to keep older metrics tied
   // to their original name when this class was named `KafkaServer`
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3282ef6c57..be2c8e72b9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1474,6 +1474,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
 
   // Cache the current config to avoid acquiring read lock to access from dynamicConfig
   @volatile private var currentConfig = this
+  val processRoles: Set[ProcessRole] = parseProcessRoles()
   private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
 
   private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
@@ -1593,7 +1594,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
   var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
   val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
-  val processRoles: Set[ProcessRole] = parseProcessRoles()
   val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
   val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
   val brokerSessionTimeoutMs: Int = getInt(KafkaConfig.BrokerSessionTimeoutMsProp)
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
index f748a455c6..3373223e36 100644
--- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala
+++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
@@ -38,6 +38,33 @@ object PasswordEncoder {
   val IterationsProp = "iterations"
   val EncyrptedPasswordProp = "encryptedPassword"
   val PasswordLengthProp = "passwordLength"
+
+  def encrypting(secret: Password,
+                 keyFactoryAlgorithm: Option[String],
+                 cipherAlgorithm: String,
+                 keyLength: Int,
+                 iterations: Int): EncryptingPasswordEncoder = {
+    new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations)
+  }
+
+  def noop(): NoOpPasswordEncoder = {
+    new NoOpPasswordEncoder()
+  }
+}
+
+trait PasswordEncoder {
+  def encode(password: Password): String
+  def decode(encodedPassword: String): Password
+
+  private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
+}
+
+/**
+ * A password encoder that does not modify the given password. This is used in KRaft mode only.
+ */
+class NoOpPasswordEncoder extends PasswordEncoder {
+  override def encode(password: Password): String = password.value()
+  override def decode(encodedPassword: String): Password = new Password(encodedPassword)
 }
 
 /**
@@ -55,16 +82,18 @@ object PasswordEncoder {
   * The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
   *
   */
-class PasswordEncoder(secret: Password,
-                      keyFactoryAlgorithm: Option[String],
-                      cipherAlgorithm: String,
-                      keyLength: Int,
-                      iterations: Int) extends Logging {
+class EncryptingPasswordEncoder(
+  secret: Password,
+  keyFactoryAlgorithm: Option[String],
+  cipherAlgorithm: String,
+  keyLength: Int,
+  iterations: Int
+) extends PasswordEncoder with Logging {
 
   private val secureRandom = new SecureRandom
   private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm)
 
-  def encode(password: Password): String = {
+  override def encode(password: Password): String = {
     val salt = new Array[Byte](256)
     secureRandom.nextBytes(salt)
     val cipher = Cipher.getInstance(cipherAlgorithm)
@@ -84,7 +113,7 @@ class PasswordEncoder(secret: Password,
     encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",")
   }
 
-  def decode(encodedPassword: String): Password = {
+  override def decode(encodedPassword: String): Password = {
     val params = CoreUtils.parseCsvMap(encodedPassword)
     val keyFactoryAlg = params(KeyFactoryAlgorithmProp)
     val cipherAlg = params(CipherAlgorithmProp)
@@ -131,8 +160,6 @@ class PasswordEncoder(secret: Password,
 
   private def base64Encode(bytes: Array[Byte]): String = Base64.getEncoder.encodeToString(bytes)
 
-  private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
-
   private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
     val aesPattern = "AES/(.*)/.*".r
     cipherAlgorithm match {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index dfff9075f8..c3d1c68c71 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -31,7 +31,7 @@ import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
-import kafka.log.{CleanerConfig, LogConfig}
+import kafka.log.{CleanerConfig, LogConfig, UnifiedLog}
 import kafka.message.ProducerCompressionCodec
 import kafka.metrics.KafkaYammerMetrics
 import kafka.network.{Processor, RequestChannel}
@@ -64,6 +64,8 @@ import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.annotation.nowarn
 import scala.collection._
@@ -80,7 +82,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
 
   import DynamicBrokerReconfigurationTest._
 
-  private val servers = new ArrayBuffer[KafkaServer]
+  private val servers = new ArrayBuffer[KafkaBroker]
   private val numServers = 3
   private val numPartitions = 10
   private val producers = new ArrayBuffer[KafkaProducer[String, String]]
@@ -111,15 +113,22 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
 
     (0 until numServers).foreach { brokerId =>
 
-      val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
+      val props = if (isKRaftTest()) {
+        val properties = TestUtils.createBrokerConfig(brokerId, null)
+        properties.put(KafkaConfig.AdvertisedListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
+        properties
+      } else {
+        val properties = TestUtils.createBrokerConfig(brokerId, zkConnect)
+        properties.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+        properties
+      }
       props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
       // Ensure that we can support multiple listeners per security protocol and multiple security protocols
       props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
-      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL")
+      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol")
       props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
       props.put(KafkaConfig.SslClientAuthProp, "requested")
       props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
-      props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
       props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
       props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test log rolling on config update
       props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads
@@ -138,17 +147,21 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
 
       val kafkaConfig = KafkaConfig.fromProps(props)
-      configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1)
+      if (!isKRaftTest()) {
+        configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1)
+      }
 
-      servers += TestUtils.createServer(kafkaConfig)
+      servers += createBroker(kafkaConfig)
     }
 
-    TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers)
-    TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, servers.head.config.offsetsTopicPartitions,
-      replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
-
     createAdminClient(SecurityProtocol.SSL, SecureInternal)
 
+    TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, numPartitions, replicationFactor = numServers)
+    TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers,
+      numPartitions = servers.head.config.offsetsTopicPartitions,
+      replicationFactor = numServers,
+      topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
+
     TestMetricsReporter.testReporters.clear()
   }
 
@@ -166,8 +179,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     closeSasl()
   }
 
-  @Test
-  def testConfigDescribeUsingAdminClient(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConfigDescribeUsingAdminClient(quorum: String): Unit = {
 
     def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, isReadOnly: Boolean,
                      expectedProps: Properties): Unit = {
@@ -226,9 +240,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     val adminClient = adminClients.head
     alterSslKeystoreUsingConfigCommand(sslProperties1, SecureExternal)
 
-    val configDesc = describeConfig(adminClient)
-    verifySslConfig("listener.name.external.", sslProperties1, configDesc)
-    verifySslConfig("", invalidSslProperties, configDesc)
+    val configDesc = TestUtils.tryUntilNoAssertionError() {
+      val describeConfigsResult = describeConfig(adminClient)
+      verifySslConfig("listener.name.external.", sslProperties1, describeConfigsResult)
+      verifySslConfig("", invalidSslProperties, describeConfigsResult)
+      describeConfigsResult
+    }
 
     // Verify a few log configs with and without synonyms
     val expectedProps = new Properties
@@ -262,8 +279,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     assertEquals(List((KafkaConfig.LogCleanerThreadsProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads))
   }
 
-  @Test
-  def testUpdatesUsingConfigProvider(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testUpdatesUsingConfigProvider(quorum: String): Unit = {
     val PollingIntervalVal = f"$${file:polling.interval:interval}"
     val PollingIntervalUpdateVal = f"$${file:polling.interval:updinterval}"
     val SslTruststoreTypeVal = f"$${file:ssl.truststore.type:storetype}"
@@ -309,11 +327,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       assertFalse(reporter.kafkaMetrics.isEmpty, "No metrics found")
     }
 
-    // fetch from ZK, values should be unresolved
-    val props = fetchBrokerConfigsFromZooKeeper(servers.head)
-    assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK")
-    assertTrue(props.getProperty(configPrefix+KafkaConfig.SslTruststoreTypeProp) == SslTruststoreTypeVal, "store type is not updated in ZK")
-    assertTrue(props.getProperty(configPrefix+KafkaConfig.SslKeystorePasswordProp) == SslKeystorePasswordVal, "keystore password is not updated in ZK")
+    if (!isKRaftTest()) {
+      // fetch from ZK, values should be unresolved
+      val props = fetchBrokerConfigsFromZooKeeper(servers.head)
+      assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK")
+      assertTrue(props.getProperty(configPrefix + KafkaConfig.SslTruststoreTypeProp) == SslTruststoreTypeVal, "store type is not updated in ZK")
+      assertTrue(props.getProperty(configPrefix + KafkaConfig.SslKeystorePasswordProp) == SslKeystorePasswordVal, "keystore password is not updated in ZK")
+    }
 
     // verify the update
     // 1. verify update not occurring if the value of property is same.
@@ -332,10 +352,10 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     }
   }
 
-  @Test
+  @Test // TODO KAFKA-14126 add KRaft support
   def testKeyStoreAlter(): Unit = {
     val topic2 = "testtopic2"
-    TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers, servers)
+    TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
 
     // Start a producer and consumer that work with the current broker keystore.
     // This should continue working while changes are made
@@ -399,7 +419,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
-  @Test
+  @Test // TODO KAFKA-14126 add KRaft support
   def testTrustStoreAlter(): Unit = {
     val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)
 
@@ -481,7 +501,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     verifySslProduceConsume(sslProperties2, "alter-truststore-7")
     waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
 
-    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
     val controllerChannelManager = controller.kafkaController.controllerChannelManager
     val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
       JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
@@ -492,8 +512,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     verifyBrokerToControllerCall(controller)
   }
 
-  @Test
-  def testLogCleanerConfig(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLogCleanerConfig(quorum: String): Unit = {
     val (producerThread, consumerThread) = startProduceConsume(retries = 0)
 
     verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
@@ -537,13 +558,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     stopAndVerifyProduceConsume(producerThread, consumerThread)
   }
 
-  @Test
-  def testConsecutiveConfigChange(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConsecutiveConfigChange(quorum: String): Unit = {
     val topic2 = "testtopic2"
     val topicProps = new Properties
     topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2")
-    TestUtils.createTopic(zkClient, topic2, 1, replicationFactor = numServers, servers, topicProps)
-    var log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+    TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps)
+
+    def getLogOrThrow(tp: TopicPartition): UnifiedLog = {
+      var (logOpt, found) = TestUtils.computeUntilTrue {
+        servers.head.logManager.getLog(tp)
+      }(_.isDefined)
+      assertTrue(found, "Log not found")
+      logOpt.get
+    }
+
+    var log = getLogOrThrow(new TopicPartition(topic2, 0))
     assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
     assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString)
 
@@ -558,7 +589,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       }
     }
 
-    log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+    log = getLogOrThrow(new TopicPartition(topic2, 0))
     assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
     assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config survives
 
@@ -566,7 +597,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     props.clear()
     props.put(KafkaConfig.LogRetentionTimeMillisProp, "604800000")
     reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogRetentionTimeMillisProp, "604800000"))
-    log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+    log = getLogOrThrow(new TopicPartition(topic2, 0))
     assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
     assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config still survives
   }
@@ -974,6 +1005,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
   }
 
   @Test
+  // Modifying advertised listeners is not supported in KRaft
   def testAdvertisedListenerUpdate(): Unit = {
     val adminClient = adminClients.head
     val externalAdminClient = createAdminClient(SecurityProtocol.SASL_SSL, SecureExternal)
@@ -994,11 +1026,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     }
 
     // Verify that endpoints have been updated in ZK for all brokers
-    servers.foreach(validateEndpointsInZooKeeper(_, endpoints => endpoints.contains(invalidHost)))
+    servers.foreach { server =>
+      validateEndpointsInZooKeeper(server.asInstanceOf[KafkaServer], endpoints => endpoints.contains(invalidHost))
+    }
 
     // Trigger session expiry and ensure that controller registers new advertised listener after expiry
     val controllerEpoch = zkClient.getControllerEpoch
-    val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller")))
+    val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller"))).asInstanceOf[KafkaServer]
     val controllerZkClient = controllerServer.zkClient
     val sessionExpiringClient = createZooKeeperClientToTriggerSessionExpiry(controllerZkClient.currentZooKeeper)
     sessionExpiringClient.close()
@@ -1022,7 +1056,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       .getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException])
 
     alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
-    servers.foreach(validateEndpointsInZooKeeper(_, endpoints => !endpoints.contains(invalidHost)))
+    servers.foreach { server =>
+      validateEndpointsInZooKeeper(server.asInstanceOf[KafkaServer], endpoints => !endpoints.contains(invalidHost))
+    }
 
     // Verify that produce/consume work now
     val topic2 = "testtopic2"
@@ -1119,7 +1155,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     assertTrue(partitions.exists(_.leader == null), "Did not find partitions with no leader")
   }
 
-  private def addListener(servers: Seq[KafkaServer], listenerName: String, securityProtocol: SecurityProtocol,
+  private def addListener(servers: Seq[KafkaBroker], listenerName: String, securityProtocol: SecurityProtocol,
                           saslMechanisms: Seq[String]): Unit = {
     val config = servers.head.config
     val existingListenerCount = config.listeners.size
@@ -1264,11 +1300,11 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
   }
 
-  private def hasListenerMetric(server: KafkaServer, listenerName: String): Boolean = {
+  private def hasListenerMetric(server: KafkaBroker, listenerName: String): Boolean = {
     server.socketServer.metrics.metrics.keySet.asScala.exists(_.tags.get("listener") == listenerName)
   }
 
-  private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = {
+  private def fetchBrokerConfigsFromZooKeeper(server: KafkaBroker): Properties = {
     val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
     server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true)
   }
@@ -1322,7 +1358,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     }, "Did not fail authentication with invalid config")
   }
 
-  private def describeConfig(adminClient: Admin, servers: Seq[KafkaServer] = this.servers): Config = {
+  private def describeConfig(adminClient: Admin, servers: Seq[KafkaBroker] = this.servers): Config = {
     val configResources = servers.map { server =>
       new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
     }
@@ -1419,7 +1455,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
   }
 
   @nowarn("cat=deprecation")
-  private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = {
+  private def alterConfigsOnServer(server: KafkaBroker, props: Properties): Unit = {
     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
     val configs = Map(new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) -> newConfig).asJava
@@ -1428,7 +1464,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
   }
 
   @nowarn("cat=deprecation")
-  private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties,
+  private def alterConfigs(servers: Seq[KafkaBroker], adminClient: Admin, props: Properties,
                    perBrokerConfig: Boolean): AlterConfigsResult = {
     val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
     val newConfig = new Config(configEntries)
@@ -1507,7 +1543,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
 
   private def createPasswordEncoder(config: KafkaConfig, secret: Option[Password]): PasswordEncoder = {
     val encoderSecret = secret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
-    new PasswordEncoder(encoderSecret,
+    PasswordEncoder.encrypting(encoderSecret,
       config.passwordEncoderKeyFactoryAlgorithm,
       config.passwordEncoderCipherAlgorithm,
       config.passwordEncoderKeyLength,
@@ -1518,7 +1554,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     servers.foreach { server => waitForConfigOnServer(server, propName, propValue, maxWaitMs) }
   }
 
-  private def waitForConfigOnServer(server: KafkaServer, propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
+  private def waitForConfigOnServer(server: KafkaBroker, propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
     TestUtils.retry(maxWaitMs) {
       assertEquals(propValue, server.config.originals.get(propName))
     }
diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
index 0a5d5ac029..50cdceabbc 100755
--- a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
@@ -30,7 +30,7 @@ class PasswordEncoderTest {
 
   @Test
   def testEncodeDecode(): Unit = {
-    val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+    val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
       None,
       Defaults.PasswordEncoderCipherAlgorithm,
       Defaults.PasswordEncoderKeyLength,
@@ -54,7 +54,7 @@ class PasswordEncoderTest {
 
   @Test
   def testEncoderConfigChange(): Unit = {
-    val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+    val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
       Some("PBKDF2WithHmacSHA1"),
       "DES/CBC/PKCS5Padding",
       64,
@@ -68,7 +68,7 @@ class PasswordEncoderTest {
     assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
 
     // Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
-    val decoder = new PasswordEncoder(new Password("password-encoder-secret"),
+    val decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
       Some("PBKDF2WithHmacSHA1"),
       "AES/CBC/PKCS5Padding",
       128,
@@ -76,7 +76,7 @@ class PasswordEncoderTest {
     assertEquals(password, decoder.decode(encoded).value)
 
     // Test that decoding fails if secret is altered
-    val decoder2 = new PasswordEncoder(new Password("secret-2"),
+    val decoder2 = PasswordEncoder.encrypting(new Password("secret-2"),
       Some("PBKDF2WithHmacSHA1"),
       "AES/CBC/PKCS5Padding",
       128,
@@ -92,7 +92,7 @@ class PasswordEncoderTest {
   def testEncodeDecodeAlgorithms(): Unit = {
 
     def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = {
-      val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+      val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
         keyFactoryAlg,
         cipherAlg,
         keyLength,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 8d921e54f1..e388896c03 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -482,7 +482,7 @@ object TestUtils extends Logging {
                   topic: String,
                   numPartitions: Int = 1,
                   replicationFactor: Int = 1,
-                  servers: Seq[KafkaServer],
+                  servers: Seq[KafkaBroker],
                   topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
@@ -514,7 +514,7 @@ object TestUtils extends Logging {
   def createTopic(zkClient: KafkaZkClient,
                   topic: String,
                   partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
-                  servers: Seq[KafkaServer]): scala.collection.immutable.Map[Int, Int] = {
+                  servers: Seq[KafkaBroker]): scala.collection.immutable.Map[Int, Int] = {
     createTopic(zkClient, topic, partitionReplicaAssignment, servers, new Properties())
   }
 
@@ -526,7 +526,7 @@ object TestUtils extends Logging {
   def createTopic(zkClient: KafkaZkClient,
                   topic: String,
                   partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
-                  servers: Seq[KafkaServer],
+                  servers: Seq[KafkaBroker],
                   topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = {
     val adminZkClient = new AdminZkClient(zkClient)
     // create topic
@@ -554,7 +554,7 @@ object TestUtils extends Logging {
     * Create the consumer offsets/group metadata topic and wait until the leader is elected and metadata is propagated
     * to all brokers.
     */
-  def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaServer]): Unit = {
+  def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
     val server = servers.head
     createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
       server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
@@ -1014,18 +1014,19 @@ object TestUtils extends Logging {
    * otherwise difficult to poll for. `computeUntilTrue` and `waitUntilTrue` should be preferred in cases where we can
    * easily wait on a condition before evaluating the assertions.
    */
-  def tryUntilNoAssertionError(waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(assertions: => Unit) = {
-    val (error, success) = TestUtils.computeUntilTrue({
+  def tryUntilNoAssertionError[T](waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(assertions: => T): T = {
+    val (either, success) = TestUtils.computeUntilTrue({
       try {
-        assertions
-        None
+        val res = assertions
+        Left(res)
       } catch {
-        case ae: AssertionError => Some(ae)
+        case ae: AssertionError => Right(ae)
       }
-    }, waitTime = waitTime, pause = pause)(_.isEmpty)
+    }, waitTime = waitTime, pause = pause)(_.isLeft)
 
-    if (!success) {
-      throw error.get
+    either match {
+      case Left(res) => res
+      case Right(err) => throw err
     }
   }