You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/10/24 22:47:35 UTC

[kafka] 02/02: MINOR; Fix error message when validating KRaft config (#12717)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e8c6a7bb69fe30cdcfb41cb648276083f74f65ba
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Fri Oct 7 10:40:55 2022 -0700

    MINOR; Fix error message when validating KRaft config (#12717)
    
    The error message reported when advertised.listeners is used in
    controller only is confusing. When the KRaft server is configured to
    controller only the following must be true:
    
    1. `advertised.listeners` is not set
    2. `listeners` contains a listener for every name in `controller.listener.names`
    3. `controller.listener.names` contains a name for every listener in `listeners`
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Igor Soarez <i...@soarez.me>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  18 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 524 +++++++++++----------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   7 +-
 3 files changed, 285 insertions(+), 264 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2df40e97b72..ca7c474fdcf 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -2162,14 +2162,16 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
       // KRaft controller-only
       validateNonEmptyQuorumVotersForKRaft()
       validateControlPlaneListenerEmptyForKRaft()
-      // advertised listeners must be empty when not also running the broker role
-      val sourceOfAdvertisedListeners: String =
-        if (getString(KafkaConfig.AdvertisedListenersProp) != null)
-          s"${KafkaConfig.AdvertisedListenersProp}"
-        else
-          s"${KafkaConfig.ListenersProp}"
-      require(effectiveAdvertisedListeners.isEmpty,
-        s"The $sourceOfAdvertisedListeners config must only contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp}=controller")
+      // advertised listeners must be empty when only the controller is configured
+      require(
+        getString(KafkaConfig.AdvertisedListenersProp) == null,
+        s"The ${KafkaConfig.AdvertisedListenersProp} config must be empty when ${KafkaConfig.ProcessRolesProp}=controller"
+      )
+      // listeners should only contain listeners also enumerated in the controller listener
+      require(
+        effectiveAdvertisedListeners.isEmpty,
+        s"The ${KafkaConfig.ListenersProp} config must only contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp}=controller"
+      )
       validateControllerQuorumVotersMustContainNodeIdForKRaftController()
       validateControllerListenerExistsForKRaftController()
       validateControllerListenerNamesMustAppearInListenersForKRaftController()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 78adff5f8fc..b9eee175503 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -49,7 +49,7 @@ class KafkaConfigTest {
   @Test
   def testLogRetentionTimeHoursProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRetentionTimeHoursProp, "1")
+    props.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "1")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -58,7 +58,7 @@ class KafkaConfigTest {
   @Test
   def testLogRetentionTimeMinutesProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRetentionTimeMinutesProp, "30")
+    props.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "30")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -67,7 +67,7 @@ class KafkaConfigTest {
   @Test
   def testLogRetentionTimeMsProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
+    props.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -84,8 +84,8 @@ class KafkaConfigTest {
   @Test
   def testLogRetentionTimeBothMinutesAndHoursProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRetentionTimeMinutesProp, "30")
-    props.put(KafkaConfig.LogRetentionTimeHoursProp, "1")
+    props.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "30")
+    props.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "1")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -94,8 +94,8 @@ class KafkaConfigTest {
   @Test
   def testLogRetentionTimeBothMinutesAndMsProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
-    props.put(KafkaConfig.LogRetentionTimeMinutesProp, "10")
+    props.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
+    props.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "10")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -109,9 +109,9 @@ class KafkaConfigTest {
     val props4 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
     val props5 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
 
-    props1.put("log.retention.ms", "-1")
-    props2.put("log.retention.minutes", "-1")
-    props3.put("log.retention.hours", "-1")
+    props1.setProperty("log.retention.ms", "-1")
+    props2.setProperty("log.retention.minutes", "-1")
+    props3.setProperty("log.retention.hours", "-1")
 
     val cfg1 = KafkaConfig.fromProps(props1)
     val cfg2 = KafkaConfig.fromProps(props2)
@@ -120,13 +120,13 @@ class KafkaConfigTest {
     assertEquals(-1, cfg2.logRetentionTimeMillis, "Should be -1")
     assertEquals(-1, cfg3.logRetentionTimeMillis, "Should be -1")
 
-    props4.put("log.retention.ms", "-1")
-    props4.put("log.retention.minutes", "30")
+    props4.setProperty("log.retention.ms", "-1")
+    props4.setProperty("log.retention.minutes", "30")
 
     val cfg4 = KafkaConfig.fromProps(props4)
     assertEquals(-1, cfg4.logRetentionTimeMillis, "Should be -1")
 
-    props5.put("log.retention.ms", "0")
+    props5.setProperty("log.retention.ms", "0")
 
     assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props5))
   }
@@ -137,9 +137,9 @@ class KafkaConfigTest {
     val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
     val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
 
-    props1.put("log.retention.ms", "0")
-    props2.put("log.retention.minutes", "0")
-    props3.put("log.retention.hours", "0")
+    props1.setProperty("log.retention.ms", "0")
+    props2.setProperty("log.retention.minutes", "0")
+    props3.setProperty("log.retention.hours", "0")
 
     assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props1))
     assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props2))
@@ -152,9 +152,9 @@ class KafkaConfigTest {
     val port = 9999
     val hostName = "fake-host"
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
-    props.put(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
     val serverConfig = KafkaConfig.fromProps(props)
 
     val endpoints = serverConfig.effectiveAdvertisedListeners
@@ -170,7 +170,7 @@ class KafkaConfigTest {
     val advertisedPort = 1234
 
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
 
     val serverConfig = KafkaConfig.fromProps(props)
     val endpoints = serverConfig.effectiveAdvertisedListeners
@@ -183,35 +183,35 @@ class KafkaConfigTest {
   @Test
   def testDuplicateListeners(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
     // listeners with duplicate port
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
     assertBadConfigContainingMessage(props, "Each listener must have a different port")
 
     // listeners with duplicate name
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
     assertBadConfigContainingMessage(props, "Each listener must have a different name")
 
     // advertised listeners can have duplicate ports
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "HOST:SASL_SSL,LB:SASL_SSL")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "HOST")
-    props.put(KafkaConfig.ListenersProp, "HOST://localhost:9091,LB://localhost:9092")
-    props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,LB://localhost:9091")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "HOST:SASL_SSL,LB:SASL_SSL")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "HOST")
+    props.setProperty(KafkaConfig.ListenersProp, "HOST://localhost:9091,LB://localhost:9092")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,LB://localhost:9091")
     KafkaConfig.fromProps(props)
 
     // but not duplicate names
-    props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,HOST://localhost:9091")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,HOST://localhost:9091")
     assertBadConfigContainingMessage(props, "Each listener must have a different name")
   }
 
   @Test
   def testControlPlaneListenerName(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
-    props.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
-    props.put("control.plane.listener.name", "CONTROLLER")
+    props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    props.setProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.setProperty("control.plane.listener.name", "CONTROLLER")
     KafkaConfig.fromProps(props)
 
     val serverConfig = KafkaConfig.fromProps(props)
@@ -234,12 +234,12 @@ class KafkaConfigTest {
   @Test
   def testControllerListenerNames(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:5000")
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:5000")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
 
     val serverConfig = KafkaConfig.fromProps(props)
     val controllerEndpoints = serverConfig.controllerListeners
@@ -253,12 +253,12 @@ class KafkaConfigTest {
   @Test
   def testControlPlaneListenerNameNotAllowedWithKRaft(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
-    props.put(KafkaConfig.ControlPlaneListenerNameProp, "SSL")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "SSL")
 
     assertFalse(isValidKafkaConfig(props))
     assertBadConfigContainingMessage(props, "control.plane.listener.name is not supported in KRaft mode.")
@@ -270,119 +270,119 @@ class KafkaConfigTest {
   @Test
   def testControllerListenerDefinedForKRaftController(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "controller")
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9093")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
 
     assertBadConfigContainingMessage(props, "The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller")
 
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
     KafkaConfig.fromProps(props)
 
     // confirm that redirecting via listener.security.protocol.map is acceptable
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:SSL")
-    props.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9093")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:SSL")
+    props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9093")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testControllerListenerDefinedForKRaftBroker(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
 
     assertFalse(isValidKafkaConfig(props))
     assertBadConfigContainingMessage(props, "controller.listener.names must contain at least one value when running KRaft with just the broker role")
 
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
     KafkaConfig.fromProps(props)
 
     // confirm that redirecting via listener.security.protocol.map is acceptable
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testPortInQuorumVotersNotRequiredToMatchFirstControllerListenerPortForThisKRaftController(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "controller,broker")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:9094")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL,SASL_SSL")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:9094")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL,SASL_SSL")
     KafkaConfig.fromProps(props)
 
     // change each of the 4 ports to port 5555 -- should pass in all circumstances since we can't validate the
     // controller.quorum.voters ports (which are the ports that clients use and are semantically "advertised" ports
     // even though the controller configuration doesn't list them in advertised.listeners) against the
     // listener ports (which are semantically different then the ports that clients use).
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:5555,SASL_SSL://localhost:9094")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:5555,SASL_SSL://localhost:9094")
     KafkaConfig.fromProps(props)
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:5555")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:5555")
     KafkaConfig.fromProps(props)
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094") // reset to original value
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:5555,3@anotherhost:9094")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094") // reset to original value
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:5555,3@anotherhost:9094")
     KafkaConfig.fromProps(props)
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:5555")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:5555")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testSeparateControllerListenerDefinedForKRaftBrokerController(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9093")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
 
     assertFalse(isValidKafkaConfig(props))
     assertBadConfigContainingMessage(props, "There must be at least one advertised listener. Perhaps all listeners appear in controller.listener.names?")
 
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
     KafkaConfig.fromProps(props)
 
     // confirm that redirecting via listener.security.protocol.map is acceptable
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093")
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testControllerListenerNameMapsToPlaintextByDefaultForKRaft(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     val controllerListenerName = new ListenerName("CONTROLLER")
     assertEquals(Some(SecurityProtocol.PLAINTEXT),
       KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
     // ensure we don't map it to PLAINTEXT when there is a SSL or SASL controller listener
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER,SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER,SSL")
     val controllerNotFoundInMapMessage = "Controller listener with name CONTROLLER defined in controller.listener.names not found in listener.security.protocol.map"
     assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
     // ensure we don't map it to PLAINTEXT when there is a SSL or SASL listener
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9092")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9092")
     assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
     props.remove(KafkaConfig.ListenersProp)
     // ensure we don't map it to PLAINTEXT when it is explicitly mapped otherwise
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
     assertEquals(Some(SecurityProtocol.SSL),
       KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
     // ensure we don't map it to PLAINTEXT when anything is explicitly given
     // (i.e. it is only part of the default value, even with KRaft)
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT")
     assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
     // ensure we can map it to a non-PLAINTEXT security protocol by default (i.e. when nothing is given)
     props.remove(KafkaConfig.ListenerSecurityProtocolMapProp)
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
     assertEquals(Some(SecurityProtocol.SSL),
       KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("SSL")))
   }
@@ -390,11 +390,11 @@ class KafkaConfigTest {
   @Test
   def testMultipleControllerListenerNamesMapToPlaintextByDefaultForKRaft(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "controller")
-    props.put(KafkaConfig.ListenersProp, "CONTROLLER1://localhost:9092,CONTROLLER2://localhost:9093")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER1,CONTROLLER2")
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER1://localhost:9092,CONTROLLER2://localhost:9093")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER1,CONTROLLER2")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
     assertEquals(Some(SecurityProtocol.PLAINTEXT),
       KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER1")))
     assertEquals(Some(SecurityProtocol.PLAINTEXT),
@@ -404,22 +404,22 @@ class KafkaConfigTest {
   @Test
   def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
-    props.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
     assertBadConfigContainingMessage(props,
       "Error creating broker listeners from 'CONTROLLER://localhost:9092': No security protocol defined for listener CONTROLLER")
     // Valid now
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
     assertEquals(None, KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER")))
   }
 
   @Test
   def testBadListenerProtocol(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
-    props.put(KafkaConfig.ListenersProp, "BAD://localhost:9091")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.ListenersProp, "BAD://localhost:9091")
 
     assertFalse(isValidKafkaConfig(props))
   }
@@ -427,12 +427,12 @@ class KafkaConfigTest {
   @Test
   def testListenerNamesWithAdvertisedListenerUnset(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
-    props.put(KafkaConfig.ListenersProp, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
+    props.setProperty(KafkaConfig.ListenersProp, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
     val config = KafkaConfig.fromProps(props)
     val expectedListeners = Seq(
       EndPoint("localhost", 9091, new ListenerName("CLIENT"), SecurityProtocol.SSL),
@@ -451,13 +451,13 @@ class KafkaConfigTest {
   @Test
   def testListenerAndAdvertisedListenerNames(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
-    props.put(KafkaConfig.ListenersProp, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
-    props.put(KafkaConfig.AdvertisedListenersProp, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "EXTERNAL:SSL,INTERNAL:PLAINTEXT")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
+    props.setProperty(KafkaConfig.ListenersProp, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "EXTERNAL:SSL,INTERNAL:PLAINTEXT")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
     val config = KafkaConfig.fromProps(props)
 
     val expectedListeners = Seq(
@@ -482,43 +482,43 @@ class KafkaConfigTest {
   @Test
   def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091,REPLICATION://localhost:9092")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "SSL")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091,REPLICATION://localhost:9092")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "SSL")
-    props.put(KafkaConfig.InterBrokerSecurityProtocolProp, "SSL")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
+    props.setProperty(KafkaConfig.InterBrokerSecurityProtocolProp, "SSL")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testCaseInsensitiveListenerProtocol(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
-    props.put(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092")
     val config = KafkaConfig.fromProps(props)
     assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString))
     assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString))
@@ -531,8 +531,8 @@ class KafkaConfigTest {
   @Test
   def testListenerDefaults(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
     // configuration with no listeners
     val conf = KafkaConfig.fromProps(props)
@@ -545,21 +545,21 @@ class KafkaConfigTest {
   @Test
   def testVersionConfiguration(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
     val conf = KafkaConfig.fromProps(props)
     assertEquals(MetadataVersion.latest, conf.interBrokerProtocolVersion)
 
-    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0")
+    props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0")
     // We need to set the message format version to make the configuration valid.
-    props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.0")
+    props.setProperty(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.0")
     val conf2 = KafkaConfig.fromProps(props)
     assertEquals(IBP_0_8_2, conf2.interBrokerProtocolVersion)
 
     // check that 0.8.2.0 is the same as 0.8.2.1
-    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1")
+    props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1")
     // We need to set the message format version to make the configuration valid
-    props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.1")
+    props.setProperty(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.1")
     val conf3 = KafkaConfig.fromProps(props)
     assertEquals(IBP_0_8_2, conf3.interBrokerProtocolVersion)
 
@@ -587,7 +587,7 @@ class KafkaConfigTest {
   @Test
   def testUncleanElectionDisabled(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(false))
+    props.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(false))
     val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.uncleanLeaderElectionEnable, false)
@@ -596,7 +596,7 @@ class KafkaConfigTest {
   @Test
   def testUncleanElectionEnabled(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(true))
+    props.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(true))
     val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
@@ -605,7 +605,7 @@ class KafkaConfigTest {
   @Test
   def testUncleanElectionInvalid(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "invalid")
+    props.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "invalid")
 
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
@@ -613,7 +613,7 @@ class KafkaConfigTest {
   @Test
   def testLogRollTimeMsProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRollTimeMillisProp, "1800000")
+    props.setProperty(KafkaConfig.LogRollTimeMillisProp, "1800000")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis)
@@ -622,8 +622,8 @@ class KafkaConfigTest {
   @Test
   def testLogRollTimeBothMsAndHoursProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRollTimeMillisProp, "1800000")
-    props.put(KafkaConfig.LogRollTimeHoursProp, "1")
+    props.setProperty(KafkaConfig.LogRollTimeMillisProp, "1800000")
+    props.setProperty(KafkaConfig.LogRollTimeHoursProp, "1")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis)
@@ -648,7 +648,7 @@ class KafkaConfigTest {
   @Test
   def testValidCompressionType(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put("compression.type", "gzip")
+    props.setProperty("compression.type", "gzip")
     val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.compressionType, "gzip")
@@ -657,34 +657,34 @@ class KafkaConfigTest {
   @Test
   def testInvalidCompressionType(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.CompressionTypeProp, "abc")
+    props.setProperty(KafkaConfig.CompressionTypeProp, "abc")
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
   @Test
   def testInvalidInterBrokerSecurityProtocol(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
-    props.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString)
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:0")
+    props.setProperty(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString)
     assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
   }
 
   @Test
   def testEqualAdvertisedListenersProtocol(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
-    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testInvalidAdvertisedListenersProtocol(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
-    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+    props.setProperty(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
     assertBadConfigContainingMessage(props, "No security protocol defined for listener TRACE")
 
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
     assertBadConfigContainingMessage(props, "advertised.listeners listener names must be equal to or a subset of the ones defined in listeners")
   }
 
@@ -693,8 +693,8 @@ class KafkaConfigTest {
   def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
     def buildConfig(interBrokerProtocol: MetadataVersion, messageFormat: MetadataVersion): KafkaConfig = {
       val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-      props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocol.version)
-      props.put(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
+      props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocol.version)
+      props.setProperty(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
       KafkaConfig.fromProps(props)
     }
 
@@ -718,7 +718,7 @@ class KafkaConfigTest {
   def testFromPropsInvalid(): Unit = {
     def baseProperties: Properties = {
       val validRequiredProperties = new Properties()
-      validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+      validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
       validRequiredProperties
     }
     // to ensure a basis is valid - bootstraps all needed validation
@@ -988,7 +988,7 @@ class KafkaConfigTest {
   def testDynamicLogConfigs(): Unit = {
     def baseProperties: Properties = {
       val validRequiredProperties = new Properties()
-      validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+      validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
       validRequiredProperties
     }
 
@@ -997,7 +997,7 @@ class KafkaConfigTest {
 
     def assertDynamic(property: String, value: Any, accessor: () => Any): Unit = {
       val initial = accessor()
-      props.put(property, value.toString)
+      props.setProperty(property, value.toString)
       config.updateCurrentConfig(new KafkaConfig(props))
       assertNotEquals(initial, accessor())
     }
@@ -1069,23 +1069,23 @@ class KafkaConfigTest {
   @Test
   def testSpecificProperties(): Unit = {
     val defaults = new Properties()
-    defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+    defaults.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
     // For ZkConnectionTimeoutMs
-    defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
-    defaults.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
-    defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
-    defaults.put(KafkaConfig.BrokerIdProp, "1")
-    defaults.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:1122")
-    defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
-    defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
-    defaults.put(KafkaConfig.LogRollTimeHoursProp, "12")
-    defaults.put(KafkaConfig.LogRollTimeJitterHoursProp, "11")
-    defaults.put(KafkaConfig.LogRetentionTimeHoursProp, "10")
+    defaults.setProperty(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
+    defaults.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    defaults.setProperty(KafkaConfig.MaxReservedBrokerIdProp, "1")
+    defaults.setProperty(KafkaConfig.BrokerIdProp, "1")
+    defaults.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:1122")
+    defaults.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
+    defaults.setProperty(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
+    defaults.setProperty(KafkaConfig.LogRollTimeHoursProp, "12")
+    defaults.setProperty(KafkaConfig.LogRollTimeJitterHoursProp, "11")
+    defaults.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "10")
     //For LogFlushIntervalMsProp
-    defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
-    defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
+    defaults.setProperty(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
+    defaults.setProperty(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
     // For MetricRecordingLevelProp
-    defaults.put(KafkaConfig.MetricRecordingLevelProp, Sensor.RecordingLevel.DEBUG.toString)
+    defaults.setProperty(KafkaConfig.MetricRecordingLevelProp, Sensor.RecordingLevel.DEBUG.toString)
 
     val config = KafkaConfig.fromProps(defaults)
     assertEquals("127.0.0.1:2181", config.zkConnect)
@@ -1107,7 +1107,7 @@ class KafkaConfigTest {
     assertEquals(24 * 60L * 60L * 1000L, config.delegationTokenExpiryTimeMs)
     assertEquals(1 * 60L * 1000L * 60, config.delegationTokenExpiryCheckIntervalMs)
 
-    defaults.put(KafkaConfig.DelegationTokenSecretKeyProp, "1234567890")
+    defaults.setProperty(KafkaConfig.DelegationTokenSecretKeyProp, "1234567890")
     val config1 = KafkaConfig.fromProps(defaults)
     assertEquals(true, config1.tokenAuthEnabled)
   }
@@ -1115,19 +1115,19 @@ class KafkaConfigTest {
   @Test
   def testNonroutableAdvertisedListeners(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
+    props.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testMaxConnectionsPerIpProp(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.MaxConnectionsPerIpProp, "0")
+    props.setProperty(KafkaConfig.MaxConnectionsPerIpProp, "0")
     assertFalse(isValidKafkaConfig(props))
-    props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
+    props.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
     KafkaConfig.fromProps(props)
-    props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
+    props.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1145,19 +1145,19 @@ class KafkaConfigTest {
   @Test
   def testDistinctControllerAndAdvertisedListenersAllowedForKRaftBroker(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094")
-    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093") // explicitly setting it in KRaft
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "3@localhost:9094")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093") // explicitly setting it in KRaft
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "3@localhost:9094")
 
     // invalid due to extra listener also appearing in controller listeners
     assertBadConfigContainingMessage(props,
       "controller.listener.names must not contain a value appearing in the 'listeners' configuration when running KRaft with just the broker role")
 
     // Valid now
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
     KafkaConfig.fromProps(props)
 
     // Also valid if we let advertised listeners be derived from listeners/controller.listener.names
@@ -1169,19 +1169,19 @@ class KafkaConfigTest {
   @Test
   def testControllerListenersCannotBeAdvertisedForKRaftBroker(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "SASL_SSL")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SASL_SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
     assertBadConfigContainingMessage(props,
       "The advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when process.roles contains the broker role")
 
     // Valid now
-    props.put(KafkaConfig.AdvertisedListenersProp, "SASL_SSL://C:9094")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "SASL_SSL://C:9094")
     KafkaConfig.fromProps(props)
 
     // Also valid if we allow advertised listeners to derive from listeners/controller.listener.names
@@ -1191,28 +1191,44 @@ class KafkaConfigTest {
 
   @Test
   def testAdvertisedListenersDisallowedForKRaftControllerOnlyRole(): Unit = {
-    val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "controller")
-    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
-    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
-    val expectedExceptionContainsTextSuffix = " config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller"
-    assertBadConfigContainingMessage(props, "The advertised.listeners" + expectedExceptionContainsTextSuffix)
-
-    // Still invalid due to extra listener if we set advertised listeners explicitly to be correct
+    // Test that advertised listeners cannot be set when KRaft and server is controller only.
+    // Test that listeners must enumerate every controller listener
+    // Test that controller listener must enumerate every listener
     val correctListeners = "PLAINTEXT://A:9092,SSL://B:9093"
-    props.put(KafkaConfig.AdvertisedListenersProp, correctListeners)
-    assertBadConfigContainingMessage(props, "The advertised.listeners" + expectedExceptionContainsTextSuffix)
+    val incorrectListeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
 
-    // Still invalid due to extra listener if we allow advertised listeners to derive from listeners/controller.listener.names
+    val correctControllerListenerNames = "PLAINTEXT,SSL"
+
+    val props = new Properties()
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    props.setProperty(KafkaConfig.ListenersProp, correctListeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, incorrectListeners)
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, correctControllerListenerNames)
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    var expectedExceptionContainsText = "The advertised.listeners config must be empty when process.roles=controller"
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+    // Invalid if advertised listeners is explicitly to the set
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, correctListeners)
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+    // Invalid if listeners contains names not in controller.listener.names
     props.remove(KafkaConfig.AdvertisedListenersProp)
-    assertBadConfigContainingMessage(props, "The listeners" + expectedExceptionContainsTextSuffix)
+    props.setProperty(KafkaConfig.ListenersProp, incorrectListeners)
+    expectedExceptionContainsText = """The listeners config must only contain KRaft controller listeners from
+    |controller.listener.names when process.roles=controller""".stripMargin.replaceAll("\n", " ")
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+    // Invalid if listeners doesn't contain every name in controller.listener.names
+    props.setProperty(KafkaConfig.ListenersProp, correctListeners)
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, correctControllerListenerNames + ",SASL_SSL")
+    expectedExceptionContainsText = """controller.listener.names must only contain values appearing in the 'listeners'
+    |configuration when running the KRaft controller role""".stripMargin.replaceAll("\n", " ")
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
 
     // Valid now
-    props.put(KafkaConfig.ListenersProp, correctListeners)
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, correctControllerListenerNames)
     KafkaConfig.fromProps(props)
   }
 
@@ -1244,7 +1260,7 @@ class KafkaConfigTest {
 
   private def assertInvalidQuorumVoters(value: String): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put(KafkaConfig.QuorumVotersProp, value)
+    props.setProperty(KafkaConfig.QuorumVotersProp, value)
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
@@ -1269,7 +1285,7 @@ class KafkaConfigTest {
 
   private def assertValidQuorumVoters(value: String, expectedVoters: util.Map[Integer, AddressSpec]): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put(KafkaConfig.QuorumVotersProp, value)
+    props.setProperty(KafkaConfig.QuorumVotersProp, value)
     val raftConfig = new RaftConfig(KafkaConfig.fromProps(props))
     assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
   }
@@ -1281,11 +1297,11 @@ class KafkaConfigTest {
     // and make sure it is allowed despite broker.id.generation.enable=true (true is the default)
     val largeBrokerId = 2000
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
-    props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString)
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.NodeIdProp, largeBrokerId.toString)
     KafkaConfig.fromProps(props)
   }
 
@@ -1293,7 +1309,7 @@ class KafkaConfigTest {
   def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = {
     // -1 is the default for both node.id and broker.id
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1301,7 +1317,7 @@ class KafkaConfigTest {
   def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = {
     // -1 is the default for both node.id and broker.id
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "controller")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1309,9 +1325,9 @@ class KafkaConfigTest {
   def testRejectsNegativeNodeIdForRaftBasedCaseWithAutoGenDisabled(): Unit = {
     // -1 is the default for both node.id and broker.id
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1323,8 +1339,8 @@ class KafkaConfigTest {
     val largeBrokerId = 2000
     val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1333,8 +1349,8 @@ class KafkaConfigTest {
     // -1 is the default for both node.id and broker.id; it implies "auto-generate" and should succeed
     val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
     KafkaConfig.fromProps(props)
   }
 
@@ -1344,10 +1360,10 @@ class KafkaConfigTest {
     val negativeTwoNodeId = -2
     val props = TestUtils.createBrokerConfig(negativeTwoNodeId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
-    props.put(KafkaConfig.NodeIdProp, negativeTwoNodeId.toString)
-    props.put(KafkaConfig.BrokerIdProp, negativeTwoNodeId.toString)
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.setProperty(KafkaConfig.NodeIdProp, negativeTwoNodeId.toString)
+    props.setProperty(KafkaConfig.BrokerIdProp, negativeTwoNodeId.toString)
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1358,9 +1374,9 @@ class KafkaConfigTest {
     val largeBrokerId = 2000
     val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
-    props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
     KafkaConfig.fromProps(props)
   }
 
@@ -1369,27 +1385,27 @@ class KafkaConfigTest {
     // -1 is the default for both node.id and broker.id
     val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testZookeeperConnectNotRequiredIfNonEmptyProcessRoles(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     KafkaConfig.fromProps(props)
   }
 
@@ -1399,12 +1415,12 @@ class KafkaConfigTest {
     val dataDir = "/path/to/data/dir"
 
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.MetadataLogDirProp, metadataDir)
-    props.put(KafkaConfig.LogDirProp, dataDir)
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.MetadataLogDirProp, metadataDir)
+    props.setProperty(KafkaConfig.LogDirProp, dataDir)
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     KafkaConfig.fromProps(props)
 
     val config = KafkaConfig.fromProps(props)
@@ -1418,11 +1434,11 @@ class KafkaConfigTest {
     val dataDir2 = "/path/to/data/dir/2"
 
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.LogDirProp, s"$dataDir1,$dataDir2")
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.LogDirProp, s"$dataDir1,$dataDir2")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     KafkaConfig.fromProps(props)
 
     val config = KafkaConfig.fromProps(props)
@@ -1478,7 +1494,7 @@ class KafkaConfigTest {
   def testNodeIdIsInferredByBrokerIdWithKraft(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
     props.setProperty(KafkaConfig.BrokerIdProp, "3")
     props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     val config = KafkaConfig.fromProps(props)
@@ -1500,7 +1516,7 @@ class KafkaConfigTest {
 
   @Test
   def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
-    val props = new Properties(kraftProps())
+    val props = new Properties()
     props.putAll(kraftProps())
     val config = KafkaConfig.fromProps(props)
     assertEquals(3, config.brokerId)
@@ -1513,7 +1529,7 @@ class KafkaConfigTest {
   @Test
   def testSaslJwksEndpointRetryDefaults(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
     val config = KafkaConfig.fromProps(props)
     assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp))
     assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp))
@@ -1531,7 +1547,7 @@ class KafkaConfigTest {
   @Test
   def testInvalidSecurityInterBrokerProtocol(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.InterBrokerSecurityProtocolProp, "abc")
+    props.setProperty(KafkaConfig.InterBrokerSecurityProtocolProp, "abc")
     val ce = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
     assertTrue(ce.getMessage.contains(KafkaConfig.InterBrokerSecurityProtocolProp))
   }
@@ -1597,7 +1613,7 @@ class KafkaConfigTest {
   def testDefaultInterBrokerProtocolVersionKRaft(): Unit = {
     val props = new Properties()
     props.putAll(kraftProps())
-    val config = new KafkaConfig(props)
+    val config = KafkaConfig.fromProps(props)
     assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1e0d5981dac..a12da68f66c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -2028,7 +2028,10 @@ object TestUtils extends Logging {
       fail("Expected illegal configuration but instead it was legal")
     } catch {
       case caught @ (_: ConfigException | _: IllegalArgumentException) =>
-        assertTrue(caught.getMessage.contains(expectedExceptionContainsText))
+        assertTrue(
+          caught.getMessage.contains(expectedExceptionContainsText),
+          s""""${caught.getMessage}" doesn't contain "$expectedExceptionContainsText""""
+        )
     }
   }
 
@@ -2312,4 +2315,4 @@ object TestUtils extends Logging {
       timedOut.set(true)
     }
   }
-}
\ No newline at end of file
+}