You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/10/24 22:47:33 UTC

[kafka] branch 3.3 updated (7bd43f5466d -> e8c6a7bb69f)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a change to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from 7bd43f5466d KAFKA-14248; Fix flaky test PlaintextAdminIntegrationTest.testCreateTopicsReturnsConfigs (#12669)
     new 5992f5141c3 MINOR: Add initial property tests for StandardAuthorizer (#12703)
     new e8c6a7bb69f MINOR; Fix error message when validating KRaft config (#12717)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 build.gradle                                       |   1 +
 checkstyle/import-control.xml                      |   2 +
 .../org/apache/kafka/common/internals/Topic.java   |  30 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  18 +-
 .../ControllerConfigurationValidatorTest.scala     |   4 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 524 +++++++++++----------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   7 +-
 .../controller/ReplicationControlManagerTest.java  |   4 +-
 .../authorizer/StandardAuthorizerPropertyTest.java | 299 ++++++++++++
 9 files changed, 613 insertions(+), 276 deletions(-)
 create mode 100644 metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java


[kafka] 02/02: MINOR; Fix error message when validating KRaft config (#12717)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e8c6a7bb69fe30cdcfb41cb648276083f74f65ba
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Fri Oct 7 10:40:55 2022 -0700

    MINOR; Fix error message when validating KRaft config (#12717)
    
    The error message reported when advertised.listeners is used in
    controller only is confusing. When the KRaft server is configured to
    controller only the following must be true:
    
    1. `advertised.listeners` is not set
    2. `listeners` contains a listener for every name in `controller.listener.names`
    3. `controller.listener.names` contains a name for every listener in `listeners`
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Igor Soarez <i...@soarez.me>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  18 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 524 +++++++++++----------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   7 +-
 3 files changed, 285 insertions(+), 264 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2df40e97b72..ca7c474fdcf 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -2162,14 +2162,16 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
       // KRaft controller-only
       validateNonEmptyQuorumVotersForKRaft()
       validateControlPlaneListenerEmptyForKRaft()
-      // advertised listeners must be empty when not also running the broker role
-      val sourceOfAdvertisedListeners: String =
-        if (getString(KafkaConfig.AdvertisedListenersProp) != null)
-          s"${KafkaConfig.AdvertisedListenersProp}"
-        else
-          s"${KafkaConfig.ListenersProp}"
-      require(effectiveAdvertisedListeners.isEmpty,
-        s"The $sourceOfAdvertisedListeners config must only contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp}=controller")
+      // advertised listeners must be empty when only the controller is configured
+      require(
+        getString(KafkaConfig.AdvertisedListenersProp) == null,
+        s"The ${KafkaConfig.AdvertisedListenersProp} config must be empty when ${KafkaConfig.ProcessRolesProp}=controller"
+      )
+      // listeners should only contain listeners also enumerated in the controller listener
+      require(
+        effectiveAdvertisedListeners.isEmpty,
+        s"The ${KafkaConfig.ListenersProp} config must only contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp}=controller"
+      )
       validateControllerQuorumVotersMustContainNodeIdForKRaftController()
       validateControllerListenerExistsForKRaftController()
       validateControllerListenerNamesMustAppearInListenersForKRaftController()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 78adff5f8fc..b9eee175503 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -49,7 +49,7 @@ class KafkaConfigTest {
   @Test
   def testLogRetentionTimeHoursProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRetentionTimeHoursProp, "1")
+    props.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "1")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -58,7 +58,7 @@ class KafkaConfigTest {
   @Test
   def testLogRetentionTimeMinutesProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRetentionTimeMinutesProp, "30")
+    props.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "30")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -67,7 +67,7 @@ class KafkaConfigTest {
   @Test
   def testLogRetentionTimeMsProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
+    props.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -84,8 +84,8 @@ class KafkaConfigTest {
   @Test
   def testLogRetentionTimeBothMinutesAndHoursProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRetentionTimeMinutesProp, "30")
-    props.put(KafkaConfig.LogRetentionTimeHoursProp, "1")
+    props.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "30")
+    props.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "1")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -94,8 +94,8 @@ class KafkaConfigTest {
   @Test
   def testLogRetentionTimeBothMinutesAndMsProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
-    props.put(KafkaConfig.LogRetentionTimeMinutesProp, "10")
+    props.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
+    props.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "10")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -109,9 +109,9 @@ class KafkaConfigTest {
     val props4 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
     val props5 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
 
-    props1.put("log.retention.ms", "-1")
-    props2.put("log.retention.minutes", "-1")
-    props3.put("log.retention.hours", "-1")
+    props1.setProperty("log.retention.ms", "-1")
+    props2.setProperty("log.retention.minutes", "-1")
+    props3.setProperty("log.retention.hours", "-1")
 
     val cfg1 = KafkaConfig.fromProps(props1)
     val cfg2 = KafkaConfig.fromProps(props2)
@@ -120,13 +120,13 @@ class KafkaConfigTest {
     assertEquals(-1, cfg2.logRetentionTimeMillis, "Should be -1")
     assertEquals(-1, cfg3.logRetentionTimeMillis, "Should be -1")
 
-    props4.put("log.retention.ms", "-1")
-    props4.put("log.retention.minutes", "30")
+    props4.setProperty("log.retention.ms", "-1")
+    props4.setProperty("log.retention.minutes", "30")
 
     val cfg4 = KafkaConfig.fromProps(props4)
     assertEquals(-1, cfg4.logRetentionTimeMillis, "Should be -1")
 
-    props5.put("log.retention.ms", "0")
+    props5.setProperty("log.retention.ms", "0")
 
     assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props5))
   }
@@ -137,9 +137,9 @@ class KafkaConfigTest {
     val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
     val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
 
-    props1.put("log.retention.ms", "0")
-    props2.put("log.retention.minutes", "0")
-    props3.put("log.retention.hours", "0")
+    props1.setProperty("log.retention.ms", "0")
+    props2.setProperty("log.retention.minutes", "0")
+    props3.setProperty("log.retention.hours", "0")
 
     assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props1))
     assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props2))
@@ -152,9 +152,9 @@ class KafkaConfigTest {
     val port = 9999
     val hostName = "fake-host"
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
-    props.put(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
     val serverConfig = KafkaConfig.fromProps(props)
 
     val endpoints = serverConfig.effectiveAdvertisedListeners
@@ -170,7 +170,7 @@ class KafkaConfigTest {
     val advertisedPort = 1234
 
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
 
     val serverConfig = KafkaConfig.fromProps(props)
     val endpoints = serverConfig.effectiveAdvertisedListeners
@@ -183,35 +183,35 @@ class KafkaConfigTest {
   @Test
   def testDuplicateListeners(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
     // listeners with duplicate port
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
     assertBadConfigContainingMessage(props, "Each listener must have a different port")
 
     // listeners with duplicate name
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
     assertBadConfigContainingMessage(props, "Each listener must have a different name")
 
     // advertised listeners can have duplicate ports
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "HOST:SASL_SSL,LB:SASL_SSL")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "HOST")
-    props.put(KafkaConfig.ListenersProp, "HOST://localhost:9091,LB://localhost:9092")
-    props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,LB://localhost:9091")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "HOST:SASL_SSL,LB:SASL_SSL")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "HOST")
+    props.setProperty(KafkaConfig.ListenersProp, "HOST://localhost:9091,LB://localhost:9092")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,LB://localhost:9091")
     KafkaConfig.fromProps(props)
 
     // but not duplicate names
-    props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,HOST://localhost:9091")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,HOST://localhost:9091")
     assertBadConfigContainingMessage(props, "Each listener must have a different name")
   }
 
   @Test
   def testControlPlaneListenerName(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
-    props.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
-    props.put("control.plane.listener.name", "CONTROLLER")
+    props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    props.setProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.setProperty("control.plane.listener.name", "CONTROLLER")
     KafkaConfig.fromProps(props)
 
     val serverConfig = KafkaConfig.fromProps(props)
@@ -234,12 +234,12 @@ class KafkaConfigTest {
   @Test
   def testControllerListenerNames(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:5000")
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:5000")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
 
     val serverConfig = KafkaConfig.fromProps(props)
     val controllerEndpoints = serverConfig.controllerListeners
@@ -253,12 +253,12 @@ class KafkaConfigTest {
   @Test
   def testControlPlaneListenerNameNotAllowedWithKRaft(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
-    props.put(KafkaConfig.ControlPlaneListenerNameProp, "SSL")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "SSL")
 
     assertFalse(isValidKafkaConfig(props))
     assertBadConfigContainingMessage(props, "control.plane.listener.name is not supported in KRaft mode.")
@@ -270,119 +270,119 @@ class KafkaConfigTest {
   @Test
   def testControllerListenerDefinedForKRaftController(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "controller")
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9093")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
 
     assertBadConfigContainingMessage(props, "The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller")
 
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
     KafkaConfig.fromProps(props)
 
     // confirm that redirecting via listener.security.protocol.map is acceptable
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:SSL")
-    props.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9093")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:SSL")
+    props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9093")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testControllerListenerDefinedForKRaftBroker(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
 
     assertFalse(isValidKafkaConfig(props))
     assertBadConfigContainingMessage(props, "controller.listener.names must contain at least one value when running KRaft with just the broker role")
 
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
     KafkaConfig.fromProps(props)
 
     // confirm that redirecting via listener.security.protocol.map is acceptable
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testPortInQuorumVotersNotRequiredToMatchFirstControllerListenerPortForThisKRaftController(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "controller,broker")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:9094")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL,SASL_SSL")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:9094")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL,SASL_SSL")
     KafkaConfig.fromProps(props)
 
     // change each of the 4 ports to port 5555 -- should pass in all circumstances since we can't validate the
     // controller.quorum.voters ports (which are the ports that clients use and are semantically "advertised" ports
     // even though the controller configuration doesn't list them in advertised.listeners) against the
     // listener ports (which are semantically different then the ports that clients use).
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:5555,SASL_SSL://localhost:9094")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:5555,SASL_SSL://localhost:9094")
     KafkaConfig.fromProps(props)
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:5555")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:5555")
     KafkaConfig.fromProps(props)
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094") // reset to original value
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:5555,3@anotherhost:9094")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094") // reset to original value
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:5555,3@anotherhost:9094")
     KafkaConfig.fromProps(props)
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:5555")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:5555")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testSeparateControllerListenerDefinedForKRaftBrokerController(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9093")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
 
     assertFalse(isValidKafkaConfig(props))
     assertBadConfigContainingMessage(props, "There must be at least one advertised listener. Perhaps all listeners appear in controller.listener.names?")
 
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
     KafkaConfig.fromProps(props)
 
     // confirm that redirecting via listener.security.protocol.map is acceptable
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093")
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testControllerListenerNameMapsToPlaintextByDefaultForKRaft(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     val controllerListenerName = new ListenerName("CONTROLLER")
     assertEquals(Some(SecurityProtocol.PLAINTEXT),
       KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
     // ensure we don't map it to PLAINTEXT when there is a SSL or SASL controller listener
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER,SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER,SSL")
     val controllerNotFoundInMapMessage = "Controller listener with name CONTROLLER defined in controller.listener.names not found in listener.security.protocol.map"
     assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
     // ensure we don't map it to PLAINTEXT when there is a SSL or SASL listener
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9092")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9092")
     assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
     props.remove(KafkaConfig.ListenersProp)
     // ensure we don't map it to PLAINTEXT when it is explicitly mapped otherwise
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
     assertEquals(Some(SecurityProtocol.SSL),
       KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
     // ensure we don't map it to PLAINTEXT when anything is explicitly given
     // (i.e. it is only part of the default value, even with KRaft)
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT")
     assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
     // ensure we can map it to a non-PLAINTEXT security protocol by default (i.e. when nothing is given)
     props.remove(KafkaConfig.ListenerSecurityProtocolMapProp)
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
     assertEquals(Some(SecurityProtocol.SSL),
       KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("SSL")))
   }
@@ -390,11 +390,11 @@ class KafkaConfigTest {
   @Test
   def testMultipleControllerListenerNamesMapToPlaintextByDefaultForKRaft(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "controller")
-    props.put(KafkaConfig.ListenersProp, "CONTROLLER1://localhost:9092,CONTROLLER2://localhost:9093")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER1,CONTROLLER2")
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER1://localhost:9092,CONTROLLER2://localhost:9093")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER1,CONTROLLER2")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
     assertEquals(Some(SecurityProtocol.PLAINTEXT),
       KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER1")))
     assertEquals(Some(SecurityProtocol.PLAINTEXT),
@@ -404,22 +404,22 @@ class KafkaConfigTest {
   @Test
   def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
-    props.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
     assertBadConfigContainingMessage(props,
       "Error creating broker listeners from 'CONTROLLER://localhost:9092': No security protocol defined for listener CONTROLLER")
     // Valid now
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
     assertEquals(None, KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER")))
   }
 
   @Test
   def testBadListenerProtocol(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
-    props.put(KafkaConfig.ListenersProp, "BAD://localhost:9091")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.ListenersProp, "BAD://localhost:9091")
 
     assertFalse(isValidKafkaConfig(props))
   }
@@ -427,12 +427,12 @@ class KafkaConfigTest {
   @Test
   def testListenerNamesWithAdvertisedListenerUnset(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
-    props.put(KafkaConfig.ListenersProp, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
+    props.setProperty(KafkaConfig.ListenersProp, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
     val config = KafkaConfig.fromProps(props)
     val expectedListeners = Seq(
       EndPoint("localhost", 9091, new ListenerName("CLIENT"), SecurityProtocol.SSL),
@@ -451,13 +451,13 @@ class KafkaConfigTest {
   @Test
   def testListenerAndAdvertisedListenerNames(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
-    props.put(KafkaConfig.ListenersProp, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
-    props.put(KafkaConfig.AdvertisedListenersProp, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "EXTERNAL:SSL,INTERNAL:PLAINTEXT")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
+    props.setProperty(KafkaConfig.ListenersProp, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "EXTERNAL:SSL,INTERNAL:PLAINTEXT")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
     val config = KafkaConfig.fromProps(props)
 
     val expectedListeners = Seq(
@@ -482,43 +482,43 @@ class KafkaConfigTest {
   @Test
   def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091,REPLICATION://localhost:9092")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "SSL")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091,REPLICATION://localhost:9092")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091")
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "SSL")
-    props.put(KafkaConfig.InterBrokerSecurityProtocolProp, "SSL")
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
+    props.setProperty(KafkaConfig.InterBrokerSecurityProtocolProp, "SSL")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testCaseInsensitiveListenerProtocol(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
-    props.put(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092")
     val config = KafkaConfig.fromProps(props)
     assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString))
     assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString))
@@ -531,8 +531,8 @@ class KafkaConfigTest {
   @Test
   def testListenerDefaults(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
 
     // configuration with no listeners
     val conf = KafkaConfig.fromProps(props)
@@ -545,21 +545,21 @@ class KafkaConfigTest {
   @Test
   def testVersionConfiguration(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.BrokerIdProp, "1")
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.BrokerIdProp, "1")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
     val conf = KafkaConfig.fromProps(props)
     assertEquals(MetadataVersion.latest, conf.interBrokerProtocolVersion)
 
-    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0")
+    props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0")
     // We need to set the message format version to make the configuration valid.
-    props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.0")
+    props.setProperty(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.0")
     val conf2 = KafkaConfig.fromProps(props)
     assertEquals(IBP_0_8_2, conf2.interBrokerProtocolVersion)
 
     // check that 0.8.2.0 is the same as 0.8.2.1
-    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1")
+    props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1")
     // We need to set the message format version to make the configuration valid
-    props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.1")
+    props.setProperty(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.1")
     val conf3 = KafkaConfig.fromProps(props)
     assertEquals(IBP_0_8_2, conf3.interBrokerProtocolVersion)
 
@@ -587,7 +587,7 @@ class KafkaConfigTest {
   @Test
   def testUncleanElectionDisabled(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(false))
+    props.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(false))
     val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.uncleanLeaderElectionEnable, false)
@@ -596,7 +596,7 @@ class KafkaConfigTest {
   @Test
   def testUncleanElectionEnabled(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(true))
+    props.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(true))
     val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
@@ -605,7 +605,7 @@ class KafkaConfigTest {
   @Test
   def testUncleanElectionInvalid(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "invalid")
+    props.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "invalid")
 
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
@@ -613,7 +613,7 @@ class KafkaConfigTest {
   @Test
   def testLogRollTimeMsProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRollTimeMillisProp, "1800000")
+    props.setProperty(KafkaConfig.LogRollTimeMillisProp, "1800000")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis)
@@ -622,8 +622,8 @@ class KafkaConfigTest {
   @Test
   def testLogRollTimeBothMsAndHoursProvided(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.LogRollTimeMillisProp, "1800000")
-    props.put(KafkaConfig.LogRollTimeHoursProp, "1")
+    props.setProperty(KafkaConfig.LogRollTimeMillisProp, "1800000")
+    props.setProperty(KafkaConfig.LogRollTimeHoursProp, "1")
 
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis)
@@ -648,7 +648,7 @@ class KafkaConfigTest {
   @Test
   def testValidCompressionType(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put("compression.type", "gzip")
+    props.setProperty("compression.type", "gzip")
     val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.compressionType, "gzip")
@@ -657,34 +657,34 @@ class KafkaConfigTest {
   @Test
   def testInvalidCompressionType(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.CompressionTypeProp, "abc")
+    props.setProperty(KafkaConfig.CompressionTypeProp, "abc")
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
   @Test
   def testInvalidInterBrokerSecurityProtocol(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
-    props.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString)
+    props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:0")
+    props.setProperty(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString)
     assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
   }
 
   @Test
   def testEqualAdvertisedListenersProtocol(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
-    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
     KafkaConfig.fromProps(props)
   }
 
   @Test
   def testInvalidAdvertisedListenersProtocol(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
-    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+    props.setProperty(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
     assertBadConfigContainingMessage(props, "No security protocol defined for listener TRACE")
 
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
+    props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
     assertBadConfigContainingMessage(props, "advertised.listeners listener names must be equal to or a subset of the ones defined in listeners")
   }
 
@@ -693,8 +693,8 @@ class KafkaConfigTest {
   def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
     def buildConfig(interBrokerProtocol: MetadataVersion, messageFormat: MetadataVersion): KafkaConfig = {
       val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-      props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocol.version)
-      props.put(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
+      props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocol.version)
+      props.setProperty(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
       KafkaConfig.fromProps(props)
     }
 
@@ -718,7 +718,7 @@ class KafkaConfigTest {
   def testFromPropsInvalid(): Unit = {
     def baseProperties: Properties = {
       val validRequiredProperties = new Properties()
-      validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+      validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
       validRequiredProperties
     }
     // to ensure a basis is valid - bootstraps all needed validation
@@ -988,7 +988,7 @@ class KafkaConfigTest {
   def testDynamicLogConfigs(): Unit = {
     def baseProperties: Properties = {
       val validRequiredProperties = new Properties()
-      validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+      validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
       validRequiredProperties
     }
 
@@ -997,7 +997,7 @@ class KafkaConfigTest {
 
     def assertDynamic(property: String, value: Any, accessor: () => Any): Unit = {
       val initial = accessor()
-      props.put(property, value.toString)
+      props.setProperty(property, value.toString)
       config.updateCurrentConfig(new KafkaConfig(props))
       assertNotEquals(initial, accessor())
     }
@@ -1069,23 +1069,23 @@ class KafkaConfigTest {
   @Test
   def testSpecificProperties(): Unit = {
     val defaults = new Properties()
-    defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+    defaults.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
     // For ZkConnectionTimeoutMs
-    defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
-    defaults.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
-    defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
-    defaults.put(KafkaConfig.BrokerIdProp, "1")
-    defaults.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:1122")
-    defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
-    defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
-    defaults.put(KafkaConfig.LogRollTimeHoursProp, "12")
-    defaults.put(KafkaConfig.LogRollTimeJitterHoursProp, "11")
-    defaults.put(KafkaConfig.LogRetentionTimeHoursProp, "10")
+    defaults.setProperty(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
+    defaults.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    defaults.setProperty(KafkaConfig.MaxReservedBrokerIdProp, "1")
+    defaults.setProperty(KafkaConfig.BrokerIdProp, "1")
+    defaults.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:1122")
+    defaults.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
+    defaults.setProperty(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
+    defaults.setProperty(KafkaConfig.LogRollTimeHoursProp, "12")
+    defaults.setProperty(KafkaConfig.LogRollTimeJitterHoursProp, "11")
+    defaults.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "10")
     //For LogFlushIntervalMsProp
-    defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
-    defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
+    defaults.setProperty(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
+    defaults.setProperty(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
     // For MetricRecordingLevelProp
-    defaults.put(KafkaConfig.MetricRecordingLevelProp, Sensor.RecordingLevel.DEBUG.toString)
+    defaults.setProperty(KafkaConfig.MetricRecordingLevelProp, Sensor.RecordingLevel.DEBUG.toString)
 
     val config = KafkaConfig.fromProps(defaults)
     assertEquals("127.0.0.1:2181", config.zkConnect)
@@ -1107,7 +1107,7 @@ class KafkaConfigTest {
     assertEquals(24 * 60L * 60L * 1000L, config.delegationTokenExpiryTimeMs)
     assertEquals(1 * 60L * 1000L * 60, config.delegationTokenExpiryCheckIntervalMs)
 
-    defaults.put(KafkaConfig.DelegationTokenSecretKeyProp, "1234567890")
+    defaults.setProperty(KafkaConfig.DelegationTokenSecretKeyProp, "1234567890")
     val config1 = KafkaConfig.fromProps(defaults)
     assertEquals(true, config1.tokenAuthEnabled)
   }
@@ -1115,19 +1115,19 @@ class KafkaConfigTest {
   @Test
   def testNonroutableAdvertisedListeners(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
+    props.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testMaxConnectionsPerIpProp(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.MaxConnectionsPerIpProp, "0")
+    props.setProperty(KafkaConfig.MaxConnectionsPerIpProp, "0")
     assertFalse(isValidKafkaConfig(props))
-    props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
+    props.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
     KafkaConfig.fromProps(props)
-    props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
+    props.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1145,19 +1145,19 @@ class KafkaConfigTest {
   @Test
   def testDistinctControllerAndAdvertisedListenersAllowedForKRaftBroker(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094")
-    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093") // explicitly setting it in KRaft
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "3@localhost:9094")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093") // explicitly setting it in KRaft
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "3@localhost:9094")
 
     // invalid due to extra listener also appearing in controller listeners
     assertBadConfigContainingMessage(props,
       "controller.listener.names must not contain a value appearing in the 'listeners' configuration when running KRaft with just the broker role")
 
     // Valid now
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
     KafkaConfig.fromProps(props)
 
     // Also valid if we let advertised listeners be derived from listeners/controller.listener.names
@@ -1169,19 +1169,19 @@ class KafkaConfigTest {
   @Test
   def testControllerListenersCannotBeAdvertisedForKRaftBroker(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
-    props.put(KafkaConfig.InterBrokerListenerNameProp, "SASL_SSL")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
+    props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SASL_SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
     assertBadConfigContainingMessage(props,
       "The advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when process.roles contains the broker role")
 
     // Valid now
-    props.put(KafkaConfig.AdvertisedListenersProp, "SASL_SSL://C:9094")
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, "SASL_SSL://C:9094")
     KafkaConfig.fromProps(props)
 
     // Also valid if we allow advertised listeners to derive from listeners/controller.listener.names
@@ -1191,28 +1191,44 @@ class KafkaConfigTest {
 
   @Test
   def testAdvertisedListenersDisallowedForKRaftControllerOnlyRole(): Unit = {
-    val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "controller")
-    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
-    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
-    props.put(KafkaConfig.NodeIdProp, "2")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
-    val expectedExceptionContainsTextSuffix = " config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller"
-    assertBadConfigContainingMessage(props, "The advertised.listeners" + expectedExceptionContainsTextSuffix)
-
-    // Still invalid due to extra listener if we set advertised listeners explicitly to be correct
+    // Test that advertised listeners cannot be set when KRaft and server is controller only.
+    // Test that listeners must enumerate every controller listener
+    // Test that controller listener must enumerate every listener
     val correctListeners = "PLAINTEXT://A:9092,SSL://B:9093"
-    props.put(KafkaConfig.AdvertisedListenersProp, correctListeners)
-    assertBadConfigContainingMessage(props, "The advertised.listeners" + expectedExceptionContainsTextSuffix)
+    val incorrectListeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
 
-    // Still invalid due to extra listener if we allow advertised listeners to derive from listeners/controller.listener.names
+    val correctControllerListenerNames = "PLAINTEXT,SSL"
+
+    val props = new Properties()
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    props.setProperty(KafkaConfig.ListenersProp, correctListeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, incorrectListeners)
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, correctControllerListenerNames)
+    props.setProperty(KafkaConfig.NodeIdProp, "2")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    var expectedExceptionContainsText = "The advertised.listeners config must be empty when process.roles=controller"
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+    // Invalid if advertised listeners is explicitly to the set
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, correctListeners)
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+    // Invalid if listeners contains names not in controller.listener.names
     props.remove(KafkaConfig.AdvertisedListenersProp)
-    assertBadConfigContainingMessage(props, "The listeners" + expectedExceptionContainsTextSuffix)
+    props.setProperty(KafkaConfig.ListenersProp, incorrectListeners)
+    expectedExceptionContainsText = """The listeners config must only contain KRaft controller listeners from
+    |controller.listener.names when process.roles=controller""".stripMargin.replaceAll("\n", " ")
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+    // Invalid if listeners doesn't contain every name in controller.listener.names
+    props.setProperty(KafkaConfig.ListenersProp, correctListeners)
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, correctControllerListenerNames + ",SASL_SSL")
+    expectedExceptionContainsText = """controller.listener.names must only contain values appearing in the 'listeners'
+    |configuration when running the KRaft controller role""".stripMargin.replaceAll("\n", " ")
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
 
     // Valid now
-    props.put(KafkaConfig.ListenersProp, correctListeners)
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, correctControllerListenerNames)
     KafkaConfig.fromProps(props)
   }
 
@@ -1244,7 +1260,7 @@ class KafkaConfigTest {
 
   private def assertInvalidQuorumVoters(value: String): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put(KafkaConfig.QuorumVotersProp, value)
+    props.setProperty(KafkaConfig.QuorumVotersProp, value)
     assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
   }
 
@@ -1269,7 +1285,7 @@ class KafkaConfigTest {
 
   private def assertValidQuorumVoters(value: String, expectedVoters: util.Map[Integer, AddressSpec]): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put(KafkaConfig.QuorumVotersProp, value)
+    props.setProperty(KafkaConfig.QuorumVotersProp, value)
     val raftConfig = new RaftConfig(KafkaConfig.fromProps(props))
     assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
   }
@@ -1281,11 +1297,11 @@ class KafkaConfigTest {
     // and make sure it is allowed despite broker.id.generation.enable=true (true is the default)
     val largeBrokerId = 2000
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
-    props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString)
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.NodeIdProp, largeBrokerId.toString)
     KafkaConfig.fromProps(props)
   }
 
@@ -1293,7 +1309,7 @@ class KafkaConfigTest {
   def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = {
     // -1 is the default for both node.id and broker.id
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1301,7 +1317,7 @@ class KafkaConfigTest {
   def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = {
     // -1 is the default for both node.id and broker.id
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "controller")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1309,9 +1325,9 @@ class KafkaConfigTest {
   def testRejectsNegativeNodeIdForRaftBasedCaseWithAutoGenDisabled(): Unit = {
     // -1 is the default for both node.id and broker.id
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1323,8 +1339,8 @@ class KafkaConfigTest {
     val largeBrokerId = 2000
     val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1333,8 +1349,8 @@ class KafkaConfigTest {
     // -1 is the default for both node.id and broker.id; it implies "auto-generate" and should succeed
     val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
     KafkaConfig.fromProps(props)
   }
 
@@ -1344,10 +1360,10 @@ class KafkaConfigTest {
     val negativeTwoNodeId = -2
     val props = TestUtils.createBrokerConfig(negativeTwoNodeId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
-    props.put(KafkaConfig.NodeIdProp, negativeTwoNodeId.toString)
-    props.put(KafkaConfig.BrokerIdProp, negativeTwoNodeId.toString)
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.setProperty(KafkaConfig.NodeIdProp, negativeTwoNodeId.toString)
+    props.setProperty(KafkaConfig.BrokerIdProp, negativeTwoNodeId.toString)
     assertFalse(isValidKafkaConfig(props))
   }
 
@@ -1358,9 +1374,9 @@ class KafkaConfigTest {
     val largeBrokerId = 2000
     val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
-    props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
     KafkaConfig.fromProps(props)
   }
 
@@ -1369,27 +1385,27 @@ class KafkaConfigTest {
     // -1 is the default for both node.id and broker.id
     val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    props.setProperty(KafkaConfig.ListenersProp, listeners)
+    props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
     assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
   def testZookeeperConnectNotRequiredIfNonEmptyProcessRoles(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     KafkaConfig.fromProps(props)
   }
 
@@ -1399,12 +1415,12 @@ class KafkaConfigTest {
     val dataDir = "/path/to/data/dir"
 
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.MetadataLogDirProp, metadataDir)
-    props.put(KafkaConfig.LogDirProp, dataDir)
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.MetadataLogDirProp, metadataDir)
+    props.setProperty(KafkaConfig.LogDirProp, dataDir)
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     KafkaConfig.fromProps(props)
 
     val config = KafkaConfig.fromProps(props)
@@ -1418,11 +1434,11 @@ class KafkaConfigTest {
     val dataDir2 = "/path/to/data/dir/2"
 
     val props = new Properties()
-    props.put(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    props.put(KafkaConfig.LogDirProp, s"$dataDir1,$dataDir2")
-    props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.LogDirProp, s"$dataDir1,$dataDir2")
+    props.setProperty(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     KafkaConfig.fromProps(props)
 
     val config = KafkaConfig.fromProps(props)
@@ -1478,7 +1494,7 @@ class KafkaConfigTest {
   def testNodeIdIsInferredByBrokerIdWithKraft(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
     props.setProperty(KafkaConfig.BrokerIdProp, "3")
     props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     val config = KafkaConfig.fromProps(props)
@@ -1500,7 +1516,7 @@ class KafkaConfigTest {
 
   @Test
   def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
-    val props = new Properties(kraftProps())
+    val props = new Properties()
     props.putAll(kraftProps())
     val config = KafkaConfig.fromProps(props)
     assertEquals(3, config.brokerId)
@@ -1513,7 +1529,7 @@ class KafkaConfigTest {
   @Test
   def testSaslJwksEndpointRetryDefaults(): Unit = {
     val props = new Properties()
-    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
     val config = KafkaConfig.fromProps(props)
     assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp))
     assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp))
@@ -1531,7 +1547,7 @@ class KafkaConfigTest {
   @Test
   def testInvalidSecurityInterBrokerProtocol(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
-    props.put(KafkaConfig.InterBrokerSecurityProtocolProp, "abc")
+    props.setProperty(KafkaConfig.InterBrokerSecurityProtocolProp, "abc")
     val ce = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
     assertTrue(ce.getMessage.contains(KafkaConfig.InterBrokerSecurityProtocolProp))
   }
@@ -1597,7 +1613,7 @@ class KafkaConfigTest {
   def testDefaultInterBrokerProtocolVersionKRaft(): Unit = {
     val props = new Properties()
     props.putAll(kraftProps())
-    val config = new KafkaConfig(props)
+    val config = KafkaConfig.fromProps(props)
     assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1e0d5981dac..a12da68f66c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -2028,7 +2028,10 @@ object TestUtils extends Logging {
       fail("Expected illegal configuration but instead it was legal")
     } catch {
       case caught @ (_: ConfigException | _: IllegalArgumentException) =>
-        assertTrue(caught.getMessage.contains(expectedExceptionContainsText))
+        assertTrue(
+          caught.getMessage.contains(expectedExceptionContainsText),
+          s""""${caught.getMessage}" doesn't contain "$expectedExceptionContainsText""""
+        )
     }
   }
 
@@ -2312,4 +2315,4 @@ object TestUtils extends Logging {
       timedOut.set(true)
     }
   }
-}
\ No newline at end of file
+}


[kafka] 01/02: MINOR: Add initial property tests for StandardAuthorizer (#12703)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 5992f5141c3e7fffc709e37ffebee9280657bd84
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Oct 4 16:31:43 2022 -0700

    MINOR: Add initial property tests for StandardAuthorizer (#12703)
    
    In https://github.com/apache/kafka/pull/12695, we discovered a gap in our testing of `StandardAuthorizer`. We addressed the specific case that was failing, but I think we need to establish a better methodology for testing which incorporates randomized inputs. This patch is a start in that direction. We implement a few basic property tests using jqwik which focus on prefix searching. It catches the case from https://github.com/apache/kafka/pull/12695 prior to the fix. In the future, we [...]
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 build.gradle                                       |   1 +
 checkstyle/import-control.xml                      |   2 +
 .../org/apache/kafka/common/internals/Topic.java   |  30 ++-
 .../ControllerConfigurationValidatorTest.scala     |   4 +-
 .../controller/ReplicationControlManagerTest.java  |   4 +-
 .../authorizer/StandardAuthorizerPropertyTest.java | 299 +++++++++++++++++++++
 6 files changed, 328 insertions(+), 12 deletions(-)

diff --git a/build.gradle b/build.gradle
index ee99cb7c870..56511cf61ce 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1152,6 +1152,7 @@ project(':metadata') {
     implementation libs.metrics
     compileOnly libs.log4j
     testImplementation libs.junitJupiter
+    testImplementation libs.jqwik
     testImplementation libs.hamcrest
     testImplementation libs.mockitoCore
     testImplementation libs.mockitoInline
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 45dc28d9e3a..44260f360bb 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -36,6 +36,7 @@
   <allow pkg="javax.net.ssl" />
   <allow pkg="javax.security" />
   <allow pkg="org.ietf.jgss" />
+  <allow pkg="net.jqwik.api" />
 
   <!-- no one depends on the server -->
   <disallow pkg="kafka" />
@@ -276,6 +277,7 @@
       <allow pkg="org.apache.kafka.common.resource" />
       <allow pkg="org.apache.kafka.controller" />
       <allow pkg="org.apache.kafka.metadata" />
+      <allow pkg="org.apache.kafka.common.internals" />
     </subpackage>
     <subpackage name="bootstrap">
       <allow pkg="org.apache.kafka.snapshot" />
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index fbf491bab98..92952a2c031 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -43,17 +43,31 @@ public class Topic {
         });
     }
 
-    public static void validate(String name, String logPrefix, Consumer<String> throwableConsumer) {
+    private static String detectInvalidTopic(String name) {
         if (name.isEmpty())
-            throwableConsumer.accept(logPrefix + " is illegal, it can't be empty");
-        if (".".equals(name) || "..".equals(name))
-            throwableConsumer.accept(logPrefix + " cannot be \".\" or \"..\"");
+            return "the empty string is not allowed";
+        if (".".equals(name))
+            return "'.' is not allowed";
+        if ("..".equals(name))
+            return "'..' is not allowed";
         if (name.length() > MAX_NAME_LENGTH)
-            throwableConsumer.accept(logPrefix + " is illegal, it can't be longer than " + MAX_NAME_LENGTH +
-                    " characters, " + logPrefix + ": " + name);
+            return "the length of '" + name + "' is longer than the max allowed length " + MAX_NAME_LENGTH;
         if (!containsValidPattern(name))
-            throwableConsumer.accept(logPrefix + " \"" + name + "\" is illegal, it contains a character other than " +
-                    "ASCII alphanumerics, '.', '_' and '-'");
+            return "'" + name + "' contains one or more characters other than " +
+                "ASCII alphanumerics, '.', '_' and '-'";
+        return null;
+    }
+
+    public static boolean isValid(String name) {
+        String reasonInvalid = detectInvalidTopic(name);
+        return reasonInvalid == null;
+    }
+
+    public static void validate(String name, String logPrefix, Consumer<String> throwableConsumer) {
+        String reasonInvalid = detectInvalidTopic(name);
+        if (reasonInvalid != null) {
+            throwableConsumer.accept(logPrefix + " is invalid: " +  reasonInvalid);
+        }
     }
 
     public static boolean isInternal(String topic) {
diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index c89910ed231..36a8d71fb97 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -39,8 +39,8 @@ class ControllerConfigurationValidatorTest {
 
   @Test
   def testInvalidTopicNameRejected(): Unit = {
-    assertEquals("Topic name \"(<-invalid->)\" is illegal, it contains a character " +
-      "other than ASCII alphanumerics, '.', '_' and '-'",
+    assertEquals("Topic name is invalid: '(<-invalid->)' contains " +
+      "one or more characters other than ASCII alphanumerics, '.', '_' and '-'",
         assertThrows(classOf[InvalidTopicException], () => validator.validate(
           new ConfigResource(TOPIC, "(<-invalid->)"), emptyMap())). getMessage())
   }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 82e378a9823..335fd52a07d 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -809,9 +809,9 @@ public class ReplicationControlManagerTest {
         ReplicationControlManager.validateNewTopicNames(topicErrors, topics, Collections.emptyMap());
         Map<String, ApiError> expectedTopicErrors = new HashMap<>();
         expectedTopicErrors.put("", new ApiError(INVALID_TOPIC_EXCEPTION,
-            "Topic name is illegal, it can't be empty"));
+            "Topic name is invalid: the empty string is not allowed"));
         expectedTopicErrors.put(".", new ApiError(INVALID_TOPIC_EXCEPTION,
-            "Topic name cannot be \".\" or \"..\""));
+            "Topic name is invalid: '.' is not allowed"));
         assertEquals(expectedTopicErrors, topicErrors);
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java
new file mode 100644
index 00000000000..8935b92e2f9
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.authorizer;
+
+import net.jqwik.api.Assume;
+import net.jqwik.api.ForAll;
+import net.jqwik.api.Property;
+import net.jqwik.api.constraints.AlphaChars;
+import net.jqwik.api.constraints.Chars;
+import net.jqwik.api.constraints.NumericChars;
+import net.jqwik.api.constraints.Size;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import static org.apache.kafka.common.security.auth.KafkaPrincipal.USER_TYPE;
+import static org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.PLAINTEXT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class StandardAuthorizerPropertyTest {
+
+    @Target({ ElementType.ANNOTATION_TYPE, ElementType.PARAMETER, ElementType.TYPE_USE })
+    @Retention(RetentionPolicy.RUNTIME)
+    @AlphaChars @NumericChars @Chars({ '_', '-', '.' })
+    public @interface ValidTopicChars { }
+
+    @Property(tries = 5000)
+    public void matchingPrefixDenyOverridesAllAllowRules(
+        @ForAll Random random,
+        @ForAll @ValidTopicChars String topic,
+        @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes
+    ) throws Exception {
+        Assume.that(Topic.isValid(topic));
+        StandardAuthorizer authorizer = buildAuthorizer();
+
+        // Create one DENY rule which matches and zero or more ALLOW rules which may or
+        // may not match. Regardless of the ALLOW rules, the final result should be DENIED.
+
+        String topicPrefix = topic.substring(0, random.nextInt(topic.length()));
+        StandardAcl denyRule = buildTopicWriteAcl(topicPrefix, PatternType.PREFIXED, AclPermissionType.DENY);
+        authorizer.addAcl(Uuid.randomUuid(), denyRule);
+        addRandomPrefixAllowAcls(authorizer, topic, randomSuffixes);
+
+        assertAuthorizationResult(
+            authorizer,
+            AuthorizationResult.DENIED,
+            AclOperation.WRITE,
+            new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+        );
+    }
+
+    @Property(tries = 5000)
+    public void matchingLiteralDenyOverridesAllAllowRules(
+        @ForAll @ValidTopicChars String topic,
+        @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes
+    ) throws Exception {
+        Assume.that(Topic.isValid(topic));
+        StandardAuthorizer authorizer = buildAuthorizer();
+
+        // Create one DENY rule which matches and zero or more ALLOW rules which may or
+        // may not match. Regardless of the ALLOW rules, the final result should be DENIED.
+
+        StandardAcl denyRule = buildTopicWriteAcl(topic, PatternType.LITERAL, AclPermissionType.DENY);
+        authorizer.addAcl(Uuid.randomUuid(), denyRule);
+        addRandomPrefixAllowAcls(authorizer, topic, randomSuffixes);
+
+        assertAuthorizationResult(
+            authorizer,
+            AuthorizationResult.DENIED,
+            AclOperation.WRITE,
+            new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+        );
+    }
+
+    @Property(tries = 5000)
+    public void matchingPrefixAllowWithNoMatchingDenyRules(
+        @ForAll Random random,
+        @ForAll @ValidTopicChars String topic,
+        @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes
+    ) throws Exception {
+        Assume.that(Topic.isValid(topic));
+        StandardAuthorizer authorizer = buildAuthorizer();
+
+        // Create one ALLOW rule which matches and zero or more DENY rules which do not
+        // match. The final result should be ALLOWED.
+
+        String topicPrefix = topic.substring(0, random.nextInt(topic.length()));
+        StandardAcl denyRule = buildTopicWriteAcl(topicPrefix, PatternType.PREFIXED, AclPermissionType.ALLOW);
+        authorizer.addAcl(Uuid.randomUuid(), denyRule);
+
+        addRandomNonMatchingPrefixDenyAcls(authorizer, topic, randomSuffixes);
+
+        assertAuthorizationResult(
+            authorizer,
+            AuthorizationResult.ALLOWED,
+            AclOperation.WRITE,
+            new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+        );
+    }
+
+    @Property(tries = 5000)
+    public void matchingLiteralAllowWithNoMatchingDenyRules(
+        @ForAll @ValidTopicChars String topic,
+        @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes
+    ) throws Exception {
+        Assume.that(Topic.isValid(topic));
+        StandardAuthorizer authorizer = buildAuthorizer();
+
+        // Create one ALLOW rule which matches and zero or more DENY rules which do not
+        // match. The final result should be ALLOWED.
+
+        StandardAcl denyRule = buildTopicWriteAcl(topic, PatternType.LITERAL, AclPermissionType.ALLOW);
+        authorizer.addAcl(Uuid.randomUuid(), denyRule);
+
+        addRandomNonMatchingPrefixDenyAcls(authorizer, topic, randomSuffixes);
+
+        assertAuthorizationResult(
+            authorizer,
+            AuthorizationResult.ALLOWED,
+            AclOperation.WRITE,
+            new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
+        );
+    }
+
+    private StandardAuthorizer buildAuthorizer() {
+        StandardAuthorizer authorizer = new StandardAuthorizer();
+        authorizer.start(new StandardAuthorizerTest.AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT)));
+        authorizer.completeInitialLoad();
+        return authorizer;
+    }
+
+    private void assertAuthorizationResult(
+        StandardAuthorizer authorizer,
+        AuthorizationResult expectedResult,
+        AclOperation operation,
+        ResourcePattern pattern
+    ) throws Exception {
+        Action action = new Action(operation, pattern, 1, false, false);
+        List<AuthorizationResult> results = authorizer.authorize(
+            newRequestContext(),
+            Collections.singletonList(action)
+        );
+
+        assertEquals(1, results.size());
+        AuthorizationResult actualResult = results.get(0);
+
+        try {
+            assertEquals(expectedResult, actualResult);
+        } catch (Throwable e) {
+            printCounterExample(authorizer, operation, pattern, actualResult);
+            throw e;
+        }
+    }
+
+    private void printCounterExample(
+        StandardAuthorizer authorizer,
+        AclOperation operation,
+        ResourcePattern resourcePattern,
+        AuthorizationResult result
+    ) {
+        System.out.println("Assertion FAILED: Operation " + operation + " on " +
+            resourcePattern + " is " + result + ". Current ACLS:");
+
+        Iterable<AclBinding> allAcls = authorizer.acls(new AclBindingFilter(
+            new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY),
+            new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY)
+        ));
+
+        allAcls.forEach(System.out::println);
+    }
+
+    private static AuthorizableRequestContext newRequestContext() throws Exception {
+        return new MockAuthorizableRequestContext.Builder()
+            .setPrincipal(new KafkaPrincipal(USER_TYPE, "user"))
+            .build();
+    }
+
+    private static StandardAcl buildTopicWriteAcl(
+        String resourceName,
+        PatternType patternType,
+        AclPermissionType permissionType
+    ) {
+        return new StandardAcl(
+            ResourceType.TOPIC,
+            resourceName,
+            patternType,
+            "User:*",
+            "*",
+            AclOperation.WRITE,
+            permissionType
+        );
+    }
+
+    private boolean isPrefix(
+        String value,
+        String prefix
+    ) {
+        if (prefix.length() > value.length()) {
+            return false;
+        } else {
+            String matchingPrefix = value.substring(0, prefix.length());
+            return matchingPrefix.equals(prefix);
+        }
+    }
+
+    private void addRandomNonMatchingPrefixDenyAcls(
+        StandardAuthorizer authorizer,
+        String topic,
+        Set<String> randomSuffixes
+    ) {
+        addRandomPrefixRules(
+            authorizer,
+            topic,
+            randomSuffixes,
+            AclPermissionType.DENY,
+            pattern -> !pattern.isEmpty() && !isPrefix(topic, pattern)
+        );
+    }
+
+    private void addRandomPrefixAllowAcls(
+        StandardAuthorizer authorizer,
+        String topic,
+        Set<String> randomSuffixes
+    ) {
+        addRandomPrefixRules(
+            authorizer,
+            topic,
+            randomSuffixes,
+            AclPermissionType.ALLOW,
+            pattern -> !pattern.isEmpty()
+        );
+    }
+    
+    private void addRandomPrefixRules(
+        StandardAuthorizer authorizer,
+        String topic,
+        Set<String> randomSuffixes,
+        AclPermissionType permissionType,
+        Predicate<String> patternFilter
+    ) {
+        Set<String> prefixPatterns = new HashSet<>();
+
+        for (int i = 0; i < topic.length(); i++) {
+            String prefix = topic.substring(0, i);
+            for (String randomSuffix : randomSuffixes) {
+                String pattern = prefix + randomSuffix;
+                if (patternFilter.test(pattern)) {
+                    prefixPatterns.add(pattern);
+                }
+            }
+        }
+
+        for (String randomResourcePattern : prefixPatterns) {
+            authorizer.addAcl(Uuid.randomUuid(), buildTopicWriteAcl(
+                randomResourcePattern,
+                PatternType.PREFIXED,
+                permissionType
+            ));
+        }        
+    }
+
+}