You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/12/10 21:30:46 UTC

[kafka] branch trunk updated: KAFKA-13456; Tighten KRaft listener config checks/constraints (#11503)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff6cf67  KAFKA-13456; Tighten KRaft listener config checks/constraints (#11503)
ff6cf67 is described below

commit ff6cf670f49301a325342b6d7b598a1b462a012c
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Fri Dec 10 16:28:57 2021 -0500

    KAFKA-13456; Tighten KRaft listener config checks/constraints (#11503)
    
    This patch tightens the configuration checks related to KRaft configs by adding the following constraints:
    
    * `control.plane.listener.name` is confirmed to be empty in KRaft mode whenever a config object is created as opposed to later when the broker is given the config and tries to start.
    * `controller.listener.names` is required to be empty for the non-KRaft (i.e. ZooKeeper) case.  A ZooKeeper-based cluster that sets this config will fail to restart until this config is removed.
    * There must be no advertised listeners when running just a KRaft controller (i.e. when `process.roles=controller`).  This means neither `listeners` nor `advertised.listeners` (if the latter is explicitly defined) can contain a listener that does not also appear in `controller.listener.names`.
    * When running a KRaft broker (i.e. when `process.roles=broker` or `process.roles=broker,controller`), advertised listeners (which was already checked to be non-empty via the check that the inter-broker listener appear there) must not include any listeners appearing in `controller.listener.names`.
    * When running a KRaft controller (i.e. when `process.roles=controller` or `process.roles=broker,controller`) `controller.listener.names` must be non-empty and every one must appear in `listeners`
    * When running just a KRaft broker (i.e. when `process.roles=broker`) `controller.listener.names` must be non-empty and none of them can appear in `listeners`.  This was indirectly checked previously, but the indirect checks did not catch all cases.
    * When running just a KRaft broker we log a warning if more than one entry appears in `controller.listener.names` because only the first entry is used.
    * We also map configured controller listener names to the `PLAINTEXT` security protocol by default provided that the security mapping is empty and no other security protocols are in use.
    
    Reviewers: José Armando García Sancio <js...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/cluster/EndPoint.scala   |  10 +-
 core/src/main/scala/kafka/raft/RaftManager.scala   |   2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   6 +-
 .../server/BrokerToControllerChannelManager.scala  |   2 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   8 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 169 ++++++++---
 core/src/main/scala/kafka/server/KafkaServer.scala |   6 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   8 +-
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    |   3 +-
 .../test/scala/unit/kafka/KafkaConfigTest.scala    |  42 ++-
 .../unit/kafka/network/ConnectionQuotasTest.scala  |   1 -
 .../unit/kafka/network/SocketServerTest.scala      |   3 +-
 .../scala/unit/kafka/raft/RaftManagerTest.scala    |  14 +-
 .../kafka/server/BrokerLifecycleManagerTest.scala  |   3 +-
 .../unit/kafka/server/ControllerApisTest.scala     |   2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   1 +
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 330 ++++++++++++++++-----
 .../unit/kafka/server/KafkaRaftServerTest.scala    |  11 +-
 .../server/ReplicaManagerConcurrencyTest.scala     |   1 +
 .../test/scala/unit/kafka/server/ServerTest.scala  |   3 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  16 +-
 21 files changed, 482 insertions(+), 159 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala
index 2f8229a..3e84f9e 100644
--- a/core/src/main/scala/kafka/cluster/EndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -17,11 +17,12 @@
 
 package kafka.cluster
 
-import org.apache.kafka.common.{Endpoint => JEndpoint, KafkaException}
+import org.apache.kafka.common.{KafkaException, Endpoint => JEndpoint}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 
+import java.util.Locale
 import scala.collection.Map
 
 object EndPoint {
@@ -57,6 +58,13 @@ object EndPoint {
       case _ => throw new KafkaException(s"Unable to parse $connectionString to a broker endpoint")
     }
   }
+
+  def parseListenerName(connectionString: String): String = {
+    connectionString match {
+      case uriParseExp(listenerNameString, _, _) => listenerNameString.toUpperCase(Locale.ROOT)
+      case _ => throw new KafkaException(s"Unable to parse a listener name from $connectionString")
+    }
+  }
 }
 
 /**
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 279e596..4c29250 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -227,7 +227,7 @@ class KafkaRaftManager[T](
 
   private def buildNetworkClient(): NetworkClient = {
     val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
-    val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
+    val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
     val channelBuilder = ChannelBuilders.clientChannelBuilder(
       controllerSecurityProtocol,
       JaasContext.Type.SERVER,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 839ee17..0871030 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -324,7 +324,7 @@ class BrokerServer(
                                                     metadataSnapshotter)
 
       val networkListeners = new ListenerCollection()
-      config.advertisedListeners.foreach { ep =>
+      config.effectiveAdvertisedListeners.foreach { ep =>
         networkListeners.add(new Listener().
           setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
           setName(ep.listenerName.value()).
@@ -404,10 +404,6 @@ class BrokerServer(
         config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
         SocketServer.DataPlaneThreadPrefix)
 
-      if (socketServer.controlPlaneRequestChannelOpt.isDefined) {
-        throw new RuntimeException(KafkaConfig.ControlPlaneListenerNameProp + " is not " +
-          "supported when in KRaft mode.")
-      }
       // Block until we've caught up with the latest metadata from the controller quorum.
       lifecycleManager.initialCatchUpFuture.get()
 
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 132a2cc..b671c70 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -82,7 +82,7 @@ object RaftControllerNodeProvider {
             config: KafkaConfig,
             controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {
     val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
-    val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
+    val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
     val controllerSaslMechanism = config.saslMechanismControllerProtocol
     new RaftControllerNodeProvider(
       raftManager,
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index cf8021c..fef9f19 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -910,12 +910,12 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
       throw new ConfigException("Dynamic reconfiguration of listeners is not yet supported when using a Raft-based metadata quorum")
     }
     val newListeners = listenersToMap(newConfig.listeners)
-    val newAdvertisedListeners = listenersToMap(newConfig.advertisedListeners)
+    val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedListeners)
     val oldListeners = listenersToMap(oldConfig.listeners)
     if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
       throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
-    if (!newListeners.keySet.subsetOf(newConfig.listenerSecurityProtocolMap.keySet))
-      throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.listenerSecurityProtocolMap}'")
+    if (!newListeners.keySet.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
+      throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.effectiveListenerSecurityProtocolMap}'")
     newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
       def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = {
         kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case (key, _) =>
@@ -926,7 +926,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
       if (immutableListenerConfigs(newConfig, listenerName.configPrefix) != immutableListenerConfigs(oldConfig, listenerName.configPrefix))
         throw new ConfigException(s"Configs cannot be updated dynamically for existing listener $listenerName, " +
           "restart broker or create a new listener for update")
-      if (oldConfig.listenerSecurityProtocolMap(listenerName) != newConfig.listenerSecurityProtocolMap(listenerName))
+      if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName))
         throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName")
     }
     if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a789b24..65bc35e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -27,7 +27,9 @@ import kafka.log.LogConfig
 import kafka.log.LogConfig.MessageFormatVersion
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec}
 import kafka.security.authorizer.AuthorizerUtils
+import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
 import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
+import kafka.utils.CoreUtils.parseCsvList
 import kafka.utils.{CoreUtils, Logging}
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.CommonClientConfigs
@@ -748,7 +750,9 @@ object KafkaConfig {
     "Different security (SSL and SASL) settings can be configured for each listener by adding a normalised " +
     "prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " +
     "INTERNAL listener, a config with name <code>listener.name.internal.ssl.keystore.location</code> would be set. " +
-    "If the config for the listener name is not set, the config will fallback to the generic config (i.e. <code>ssl.keystore.location</code>). "
+    "If the config for the listener name is not set, the config will fallback to the generic config (i.e. <code>ssl.keystore.location</code>). " +
+    "Note that in KRaft a default mapping from the listener names defined by <code>controller.listener.names</code> to PLAINTEXT " +
+    "is assumed if no explicit mapping is provided and no other security protocol is in use."
   val controlPlaneListenerNameDoc = "Name of listener used for communication between controller and brokers. " +
     s"Broker will use the $ControlPlaneListenerNameProp to locate the endpoint in $ListenersProp list, to listen for connections from the controller. " +
     "For example, if a broker's config is :\n" +
@@ -1887,15 +1891,21 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   }
 
   def listeners: Seq[EndPoint] =
-    CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp), listenerSecurityProtocolMap)
+    CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp), effectiveListenerSecurityProtocolMap)
 
-  def controllerListenerNames: Seq[String] =
-    Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("").split(",")
+  def controllerListenerNames: Seq[String] = {
+    val value = Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("")
+    if (value.isEmpty) {
+      Seq.empty
+    } else {
+      value.split(",")
+    }
+  }
 
   def controllerListeners: Seq[EndPoint] =
     listeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
 
-  def saslMechanismControllerProtocol = getString(KafkaConfig.SaslMechanismControllerProtocolProp)
+  def saslMechanismControllerProtocol: String = getString(KafkaConfig.SaslMechanismControllerProtocolProp)
 
   def controlPlaneListener: Option[EndPoint] = {
     controlPlaneListenerName.map { listenerName =>
@@ -1912,10 +1922,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   }
 
   // Use advertised listeners if defined, fallback to listeners otherwise
-  def advertisedListeners: Seq[EndPoint] = {
+  def effectiveAdvertisedListeners: Seq[EndPoint] = {
     val advertisedListenersProp = getString(KafkaConfig.AdvertisedListenersProp)
     if (advertisedListenersProp != null)
-      CoreUtils.listenerListToEndPoints(advertisedListenersProp, listenerSecurityProtocolMap, requireDistinctPorts=false)
+      CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
     else
       listeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value()))
   }
@@ -1927,7 +1937,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
           s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
       case Some(name) =>
         val listenerName = ListenerName.normalised(name)
-        val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
+        val securityProtocol = effectiveListenerSecurityProtocolMap.getOrElse(listenerName,
           throw new ConfigException(s"Listener with name ${listenerName.value} defined in " +
             s"${KafkaConfig.InterBrokerListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
         (listenerName, securityProtocol)
@@ -1942,7 +1952,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
     Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
       case Some(name) =>
         val listenerName = ListenerName.normalised(name)
-        val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
+        val securityProtocol = effectiveListenerSecurityProtocolMap.getOrElse(listenerName,
           throw new ConfigException(s"Listener with ${listenerName.value} defined in " +
             s"${KafkaConfig.ControlPlaneListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
         Some(listenerName, securityProtocol)
@@ -1959,10 +1969,28 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
     }
   }
 
-  def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
-    getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+  def effectiveListenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
+    val mapValue = getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp))
       .map { case (listenerName, protocolName) =>
-      ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+        ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+      }
+    if (usesSelfManagedQuorum && !originals.containsKey(ListenerSecurityProtocolMapProp)) {
+      // Nothing was specified explicitly for listener.security.protocol.map, so we are using the default value,
+      // and we are using KRaft.
+      // Add PLAINTEXT mappings for controller listeners as long as there is no SSL or SASL_{PLAINTEXT,SSL} in use
+      def isSslOrSasl(name: String): Boolean = name.equals(SecurityProtocol.SSL.name) || name.equals(SecurityProtocol.SASL_SSL.name) || name.equals(SecurityProtocol.SASL_PLAINTEXT.name)
+      // check controller listener names (they won't appear in listeners when process.roles=broker)
+      // as well as listeners for occurrences of SSL or SASL_*
+      if (controllerListenerNames.exists(isSslOrSasl) ||
+        parseCsvList(getString(KafkaConfig.ListenersProp)).exists(listenerValue => isSslOrSasl(EndPoint.parseListenerName(listenerValue)))) {
+        mapValue // don't add default mappings since we found something that is SSL or SASL_*
+      } else {
+        // add the PLAINTEXT mappings for all controller listener names that are not explicitly PLAINTEXT
+        mapValue ++ controllerListenerNames.filter(!SecurityProtocol.PLAINTEXT.name.equals(_)).map(
+          new ListenerName(_) -> SecurityProtocol.PLAINTEXT)
+      }
+    } else {
+      mapValue
     }
   }
 
@@ -1992,18 +2020,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
         throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which is required " +
           s"when `process.roles` is defined (i.e. when running in KRaft mode).")
       }
-
-      // Validate process.roles with controller.quorum.voters
-      val voterIds: Set[Integer] = RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet
-      if (voterIds.isEmpty) {
-        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
-      } else if (processRoles.contains(ControllerRole)) {
-        // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
-        require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${KafkaConfig.QuorumVotersProp}=$voterIds")
-      } else {
-        // Ensure that the broker's node.id is not an id in controller.quorum.voters
-        require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${KafkaConfig.QuorumVotersProp}=$voterIds")
-      }
     }
     require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
     require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
@@ -2018,12 +2034,103 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the server has the controller role")
+    val advertisedListenerNames = effectiveAdvertisedListeners.map(_.listenerName).toSet
+
+    // validate KRaft-related configs
+    val voterAddressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+    def validateNonEmptyQuorumVotersForKRaft(): Unit = {
+      if (voterAddressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+    }
+    def validateControlPlaneListenerEmptyForKRaft(): Unit = {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.")
+    }
+    def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = {
+      require(!advertisedListenerNames.exists(aln => controllerListenerNames.contains(aln.value())),
+        s"The advertised.listeners config must not contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.")
+    }
+    def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
+      require(voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+    }
+    def validateControllerListenerExistsForKRaftController(): Unit = {
+      require(controllerListeners.nonEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role")
+    }
+    def validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit = {
+      val listenerNameValues = listeners.map(_.listenerName.value).toSet
+      require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)),
+        s"${KafkaConfig.ControllerListenerNamesProp} must only contain values appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role")
+    }
+    def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
+      require(advertisedListenerNames.nonEmpty,
+        "There must be at least one advertised listener." + (
+          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in ${ControllerListenerNamesProp}?" else ""))
+    }
+    if (processRoles == Set(BrokerRole)) {
+      // KRaft broker-only
+      validateNonEmptyQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+      // nodeId must not appear in controller.quorum.voters
+      require(!voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+      // controller.listener.names must be non-empty...
+      require(controllerListenerNames.nonEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value when running KRaft with just the broker role")
+      // controller.listener.names are forbidden in listeners...
+      require(controllerListeners.isEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must not contain a value appearing in the '${KafkaConfig.ListenersProp}' configuration when running KRaft with just the broker role")
+      // controller.listener.names must all appear in listener.security.protocol.map
+      controllerListenerNames.foreach { name =>
+        val listenerName = ListenerName.normalised(name)
+        if (!effectiveListenerSecurityProtocolMap.contains(listenerName)) {
+          throw new ConfigException(s"Controller listener with name ${listenerName.value} defined in " +
+            s"${KafkaConfig.ControllerListenerNamesProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}  (an explicit security mapping for each controller listener is required if ${KafkaConfig.ListenerSecurityProtocolMapProp} is non-empty, or if there are security protocols other than PLAINTEXT in use)")
+        }
+      }
+      // warn that only the first controller listener is used if there is more than one
+      if (controllerListenerNames.size > 1) {
+        warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple entries; only the first will be used since ${KafkaConfig.ProcessRolesProp}=broker: ${controllerListenerNames.asJava}")
+      }
+      validateAdvertisedListenersNonEmptyForBroker()
+    } else if (processRoles == Set(ControllerRole)) {
+      // KRaft controller-only
+      validateNonEmptyQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      // advertised listeners must be empty when not also running the broker role
+      val sourceOfAdvertisedListeners: String =
+        if (getString(KafkaConfig.AdvertisedListenersProp) != null)
+          s"${KafkaConfig.AdvertisedListenersProp}"
+        else
+          s"${KafkaConfig.ListenersProp}"
+      require(effectiveAdvertisedListeners.isEmpty,
+        s"The $sourceOfAdvertisedListeners config must only contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp}=controller")
+      validateControllerQuorumVotersMustContainNodeIdForKRaftController()
+      validateControllerListenerExistsForKRaftController()
+      validateControllerListenerNamesMustAppearInListenersForKRaftController()
+    } else if (processRoles == Set(BrokerRole, ControllerRole)) {
+      // KRaft colocated broker and controller
+      validateNonEmptyQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+      validateControllerQuorumVotersMustContainNodeIdForKRaftController()
+      validateControllerListenerExistsForKRaftController()
+      validateControllerListenerNamesMustAppearInListenersForKRaftController()
+      validateAdvertisedListenersNonEmptyForBroker()
+    } else {
+      // ZK-based
+      // controller listener names must be empty when not in KRaft mode
+      require(controllerListenerNames.isEmpty, s"${KafkaConfig.ControllerListenerNamesProp} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
+      validateAdvertisedListenersNonEmptyForBroker()
+    }
 
-    val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
     val listenerNames = listeners.map(_.listenerName).toSet
     if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
+      // validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
+      validateAdvertisedListenersNonEmptyForBroker()
       require(advertisedListenerNames.contains(interBrokerListenerName),
         s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
           s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
@@ -2034,15 +2141,11 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
       )
     }
 
-    require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
+    require(!effectiveAdvertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
       s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
       s"Use a routable IP address.")
 
-    // Ensure controller listeners are not in the advertised listeners list
-    require(!controllerListeners.exists(advertisedListeners.contains),
-      s"${KafkaConfig.AdvertisedListenersProp} cannot contain any of ${KafkaConfig.ControllerListenerNamesProp}")
-
-    // validate controller.listener.name config
+    // validate control.plane.listener.name config
     if (controlPlaneListenerName.isDefined) {
       require(advertisedListenerNames.contains(controlPlaneListenerName.get),
         s"${KafkaConfig.ControlPlaneListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9f5674e..416895e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -497,14 +497,14 @@ class KafkaServer(
   }
 
   def createBrokerInfo: BrokerInfo = {
-    val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}")
+    val endPoints = config.effectiveAdvertisedListeners.map(e => s"${e.host}:${e.port}")
     zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>
       val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)
       require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +
         s" advertised listeners are already registered by broker ${broker.id}")
     }
 
-    val listeners = config.advertisedListeners.map { endpoint =>
+    val listeners = config.effectiveAdvertisedListeners.map { endpoint =>
       if (endpoint.port == 0)
         endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
       else
@@ -808,7 +808,7 @@ class KafkaServer(
 
   /** Return advertised listeners with the bound port (this may differ from the configured port if the latter is `0`). */
   def advertisedListeners: Seq[EndPoint] = {
-    config.advertisedListeners.map { endPoint =>
+    config.effectiveAdvertisedListeners.map { endPoint =>
       endPoint.copy(port = boundPort(endPoint.listenerName))
     }
   }
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 0c0950a..687cd9e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1095,7 +1095,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     val listeners = config.listeners
       .map(e => s"${e.listenerName.value}://${e.host}:${e.port}")
       .mkString(",") + s",$listenerName://localhost:0"
-    val listenerMap = config.listenerSecurityProtocolMap
+    val listenerMap = config.effectiveListenerSecurityProtocolMap
       .map { case (name, protocol) => s"${name.value}:${protocol.name}" }
       .mkString(",") + s",$listenerName:${securityProtocol.name}"
 
@@ -1174,7 +1174,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
       .filter(e => e.listenerName.value != securityProtocol.name)
       .map(e => s"${e.listenerName.value}://${e.host}:${e.port}")
       .mkString(",")
-    val listenerMap = config.listenerSecurityProtocolMap
+    val listenerMap = config.effectiveListenerSecurityProtocolMap
       .filter { case (listenerName, _) => listenerName.value != securityProtocol.name }
       .map { case (listenerName, protocol) => s"${listenerName.value}:${protocol.name}" }
       .mkString(",")
@@ -1364,7 +1364,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
   private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = {
     val configs = servers.map { server =>
       val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
-      val newListeners = server.config.advertisedListeners.map { e =>
+      val newListeners = server.config.effectiveAdvertisedListeners.map { e =>
         if (e.listenerName.value == SecureExternal)
           s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}"
         else
@@ -1376,7 +1376,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
     adminClient.alterConfigs(configs).all.get
     servers.foreach { server =>
       TestUtils.retry(10000) {
-        val externalListener = server.config.advertisedListeners.find(_.listenerName.value == SecureExternal)
+        val externalListener = server.config.effectiveAdvertisedListeners.find(_.listenerName.value == SecureExternal)
           .getOrElse(throw new IllegalStateException("External listener not found"))
         assertEquals(newHost, externalListener.host, "Config not updated")
       }
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 526c499..94bf453 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -59,8 +59,9 @@ final class KafkaMetadataLogTest {
   def testConfig(): Unit = {
     val props = new Properties()
     props.put(ProcessRolesProp, util.Arrays.asList("broker"))
-    props.put(QuorumVotersProp, "1@localhost:9092")
+    props.put(QuorumVotersProp, "1@localhost:9093")
     props.put(NodeIdProp, Int.box(2))
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     props.put(MetadataLogSegmentBytesProp, Int.box(10240))
     props.put(MetadataLogSegmentMillisProp, Int.box(10 * 1024))
     assertThrows(classOf[InvalidConfigurationException], () => {
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index bf056e8..8c8d3e7 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -20,10 +20,9 @@ import java.io.File
 import java.nio.file.Files
 import java.util
 import java.util.Properties
-
 import kafka.server.KafkaConfig
 import kafka.utils.Exit
-import org.apache.kafka.common.config.ConfigException
+import kafka.utils.TestUtils.assertBadConfigContainingMessage
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.internals.FatalExitError
@@ -89,7 +88,8 @@ class KafkaTest {
     propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
     propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
     setListenerProps(propertiesFile)
-    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+    assertBadConfigContainingMessage(propertiesFile,
+      "If process.roles contains just the 'broker' role, the node id 1 must not be included in the set of voters")
 
     // Ensure that with a valid config no exception is thrown
     propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
@@ -104,7 +104,8 @@ class KafkaTest {
     propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
     propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
     setListenerProps(propertiesFile)
-    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+    assertBadConfigContainingMessage(propertiesFile,
+      "If process.roles contains the 'controller' role, the node id 1 must be included in the set of voters")
 
     // Ensure that with a valid config no exception is thrown
     propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
@@ -119,7 +120,8 @@ class KafkaTest {
     propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
     propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
     setListenerProps(propertiesFile)
-    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+    assertBadConfigContainingMessage(propertiesFile,
+      "If process.roles contains the 'controller' role, the node id 1 must be included in the set of voters")
 
     // Ensure that with a valid config no exception is thrown
     propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
@@ -134,21 +136,39 @@ class KafkaTest {
     propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
     propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
     setListenerProps(propertiesFile)
-    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+    assertBadConfigContainingMessage(propertiesFile,
+      "If using process.roles, controller.quorum.voters must contain a parseable set of voters.")
 
     // Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined
     propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "")
-    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+    assertBadConfigContainingMessage(propertiesFile,
+      "Missing required configuration `zookeeper.connect` which has no default value.")
 
-    // Ensure that no exception is thrown once zookeeper.connect is defined
+    // Ensure that no exception is thrown once zookeeper.connect is defined (and we clear controller.listener.names)
     propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+    propertiesFile.setProperty(KafkaConfig.ControllerListenerNamesProp, "")
     KafkaConfig.fromProps(propertiesFile)
   }
 
   private def setListenerProps(props: Properties): Unit = {
-    props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
-    props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-    if (props.getProperty(KafkaConfig.ProcessRolesProp).contains("broker")) {
+    val hasBrokerRole = props.getProperty(KafkaConfig.ProcessRolesProp).contains("broker")
+    val hasControllerRole = props.getProperty(KafkaConfig.ProcessRolesProp).contains("controller")
+    val controllerListener = "SASL_PLAINTEXT://localhost:9092"
+    val brokerListener = "PLAINTEXT://localhost:9093"
+
+    if (hasBrokerRole || hasControllerRole) { // KRaft
+      props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SASL_PLAINTEXT")
+      if (hasBrokerRole && hasControllerRole) {
+        props.setProperty(KafkaConfig.ListenersProp, s"$brokerListener,$controllerListener")
+      } else if (hasControllerRole) {
+        props.setProperty(KafkaConfig.ListenersProp, controllerListener)
+      } else if (hasBrokerRole) {
+        props.setProperty(KafkaConfig.ListenersProp, brokerListener)
+      }
+    } else { // ZK-based
+       props.setProperty(KafkaConfig.ListenersProp, brokerListener)
+    }
+    if (!(hasControllerRole & !hasBrokerRole)) { // not controller-only
       props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
       props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092") 
     }
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 10f04d8..2ebcc37 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -395,7 +395,6 @@ class ConnectionQuotasTest {
     val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
     val props = brokerPropsWithDefaultConnectionLimits
     props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
-    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
     val config = KafkaConfig.fromProps(props)
     connectionQuotas = new ConnectionQuotas(config, time, metrics)
 
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 27d1f9a..77a5766 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -296,7 +296,7 @@ class SocketServerTest {
     val testableServer = new TestableSocketServer(config)
     testableServer.startup(startProcessingRequests = false)
 
-    val updatedEndPoints = config.advertisedListeners.map { endpoint =>
+    val updatedEndPoints = config.effectiveAdvertisedListeners.map { endpoint =>
       endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
     }.map(_.toJava)
 
@@ -1169,7 +1169,6 @@ class SocketServerTest {
   def testClientDisconnectionWithOutstandingReceivesProcessedUntilFailedSend(): Unit = {
     val serverMetrics = new Metrics
     @volatile var selector: TestableSelector = null
-    props.put(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
     val overrideServer = new SocketServer(
       KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider, apiVersionManager
     ) {
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 827d42e..a7a9519 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -43,18 +43,18 @@ class RaftManagerTest {
       props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
-      props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
-      props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+      props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
       if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
-        props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
-        if (!processRoles.contains("controller")) {
+        if (processRoles.contains("controller")) { // co-located
+          props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+          props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
+        } else { // broker-only
           val voterId = (nodeId.toInt + 1)
           props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
         }
-      } 
-
-      if (processRoles.contains("controller")) {
+      } else if (processRoles.contains("controller")) { // controller-only
+        props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
         props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
       }
 
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index d97724d..dd3e49d 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -49,6 +49,7 @@ class BrokerLifecycleManagerTest {
     properties.setProperty(KafkaConfig.ProcessRolesProp, "broker")
     properties.setProperty(KafkaConfig.NodeIdProp, "1")
     properties.setProperty(KafkaConfig.QuorumVotersProp, s"2@localhost:9093")
+    properties.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
     properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "300000")
     properties
   }
@@ -80,7 +81,7 @@ class BrokerLifecycleManagerTest {
       time, controllerNodeProvider, nodeApiVersions)
     val clusterId = "x4AJGXQSRnephtTZzujw4w"
     val advertisedListeners = new ListenerCollection()
-    config.advertisedListeners.foreach { ep =>
+    config.effectiveAdvertisedListeners.foreach { ep =>
       advertisedListeners.add(new Listener().setHost(ep.host).
         setName(ep.listenerName.value()).
         setPort(ep.port.shortValue()).
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 318e17e..2176d23 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -95,7 +95,7 @@ class ControllerApisTest {
     props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
     props.put(KafkaConfig.ProcessRolesProp, "controller")
     props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-    props.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
+    props.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9092")
     new ControllerApis(
       requestChannel,
       authorizer,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 777857f..98c2666 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -135,6 +135,7 @@ class KafkaApisTest {
       properties.put(KafkaConfig.ProcessRolesProp, "broker")
       val voterId = (brokerId + 1)
       properties.put(KafkaConfig.QuorumVotersProp, s"$voterId@localhost:9093")
+      properties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
       properties
     } else {
       TestUtils.createBrokerConfig(brokerId, "zk")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index bda39e3..f204af2 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -21,6 +21,7 @@ import kafka.api.{ApiVersion, KAFKA_0_8_2, KAFKA_3_0_IV1}
 import kafka.cluster.EndPoint
 import kafka.log.LogConfig
 import kafka.message._
+import kafka.utils.TestUtils.assertBadConfigContainingMessage
 import kafka.utils.{CoreUtils, TestUtils}
 import org.apache.kafka.common.config.{ConfigException, TopicConfig}
 import org.apache.kafka.common.metrics.Sensor
@@ -155,7 +156,7 @@ class KafkaConfigTest {
     props.put(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
     val serverConfig = KafkaConfig.fromProps(props)
 
-    val endpoints = serverConfig.advertisedListeners
+    val endpoints = serverConfig.effectiveAdvertisedListeners
     assertEquals(1, endpoints.size)
     val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
     assertEquals(endpoint.host, hostName)
@@ -171,7 +172,7 @@ class KafkaConfigTest {
     props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
 
     val serverConfig = KafkaConfig.fromProps(props)
-    val endpoints = serverConfig.advertisedListeners
+    val endpoints = serverConfig.effectiveAdvertisedListeners
     val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
 
     assertEquals(endpoint.host, advertisedHostName)
@@ -186,34 +187,31 @@ class KafkaConfigTest {
 
     // listeners with duplicate port
     props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
-    var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
-    assertTrue(caught.getMessage.contains("Each listener must have a different port"))
+    assertBadConfigContainingMessage(props, "Each listener must have a different port")
 
     // listeners with duplicate name
     props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
-    caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
-    assertTrue(caught.getMessage.contains("Each listener must have a different name"))
+    assertBadConfigContainingMessage(props, "Each listener must have a different name")
 
     // advertised listeners can have duplicate ports
     props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "HOST:SASL_SSL,LB:SASL_SSL")
     props.put(KafkaConfig.InterBrokerListenerNameProp, "HOST")
     props.put(KafkaConfig.ListenersProp, "HOST://localhost:9091,LB://localhost:9092")
     props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,LB://localhost:9091")
-    assertTrue(isValidKafkaConfig(props))
+    KafkaConfig.fromProps(props)
 
     // but not duplicate names
     props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,HOST://localhost:9091")
-    caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
-    assertTrue(caught.getMessage.contains("Each listener must have a different name"))
+    assertBadConfigContainingMessage(props, "Each listener must have a different name")
   }
 
   @Test
-  def testControlPlaneListenerName() = {
+  def testControlPlaneListenerName(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
     props.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
     props.put("control.plane.listener.name", "CONTROLLER")
-    assertTrue(isValidKafkaConfig(props))
+    KafkaConfig.fromProps(props)
 
     val serverConfig = KafkaConfig.fromProps(props)
     val controlEndpoint = serverConfig.controlPlaneListener.get
@@ -222,10 +220,10 @@ class KafkaConfigTest {
     assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol)
 
     //advertised listener should contain control-plane listener
-    val advertisedEndpoints = serverConfig.advertisedListeners
-    assertFalse(advertisedEndpoints.filter { endpoint =>
+    val advertisedEndpoints = serverConfig.effectiveAdvertisedListeners
+    assertTrue(advertisedEndpoints.exists { endpoint =>
       endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value())
-    }.isEmpty)
+    })
 
     // interBrokerListener name should be different from control-plane listener name
     val interBrokerListenerName = serverConfig.interBrokerListenerName
@@ -233,21 +231,16 @@ class KafkaConfigTest {
   }
 
   @Test
-  def testControllerListenerName() = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000,CONTROLLER://localhost:5000")
-    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLPLANE:SSL,CONTROLLER:SASL_SSL")
-    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000")
-    props.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLPLANE")
+  def testControllerListenerNames(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
     props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
-    assertTrue(isValidKafkaConfig(props))
+    props.put(KafkaConfig.NodeIdProp, "2")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:5000")
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
 
     val serverConfig = KafkaConfig.fromProps(props)
-    val controlPlaneEndpoint = serverConfig.controlPlaneListener.get
-    assertEquals("localhost", controlPlaneEndpoint.host)
-    assertEquals(4000, controlPlaneEndpoint.port)
-    assertEquals(SecurityProtocol.SSL, controlPlaneEndpoint.securityProtocol)
-
     val controllerEndpoints = serverConfig.controllerListeners
     assertEquals(1, controllerEndpoints.size)
     val controllerEndpoint = controllerEndpoints.iterator.next()
@@ -257,19 +250,167 @@ class KafkaConfigTest {
   }
 
   @Test
-  def testControllerListenerDefined(): Unit = {
+  def testControlPlaneListenerNameNotAllowedWithKRaft(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.put(KafkaConfig.NodeIdProp, "2")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    props.put(KafkaConfig.ControlPlaneListenerNameProp, "SSL")
+
+    assertFalse(isValidKafkaConfig(props))
+    assertBadConfigContainingMessage(props, "control.plane.listener.name is not supported in KRaft mode.")
+
+    props.remove(KafkaConfig.ControlPlaneListenerNameProp)
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def testControllerListenerDefinedForKRaftController(): Unit = {
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "controller")
-    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+    props.put(KafkaConfig.NodeIdProp, "2")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+
+    assertBadConfigContainingMessage(props, "The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller")
+
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    KafkaConfig.fromProps(props)
+
+    // confirm that redirecting via listener.security.protocol.map is acceptable
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:SSL")
+    props.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9093")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def testControllerListenerDefinedForKRaftBroker(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker")
     props.put(KafkaConfig.NodeIdProp, "1")
-    props.put(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+
+    assertFalse(isValidKafkaConfig(props))
+    assertBadConfigContainingMessage(props, "controller.listener.names must contain at least one value when running KRaft with just the broker role")
+
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    KafkaConfig.fromProps(props)
+
+    // confirm that redirecting via listener.security.protocol.map is acceptable
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def testPortInQuorumVotersNotRequiredToMatchFirstControllerListenerPortForThisKRaftController(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "controller,broker")
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094")
+    props.put(KafkaConfig.NodeIdProp, "2")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:9094")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL,SASL_SSL")
+    KafkaConfig.fromProps(props)
+
+    // change each of the 4 ports to port 5555 -- should pass in all circumstances since we can't validate the
+    // controller.quorum.voters ports (which are the ports that clients use and are semantically "advertised" ports
+    // even though the controller configuration doesn't list them in advertised.listeners) against the
+    // listener ports (which are semantically different then the ports that clients use).
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:5555,SASL_SSL://localhost:9094")
+    KafkaConfig.fromProps(props)
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:5555")
+    KafkaConfig.fromProps(props)
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094") // reset to original value
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:5555,3@anotherhost:9094")
+    KafkaConfig.fromProps(props)
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:5555")
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def testSeparateControllerListenerDefinedForKRaftBrokerController(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    props.put(KafkaConfig.NodeIdProp, "2")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
 
     assertFalse(isValidKafkaConfig(props))
-    val caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
-    assertTrue(caught.getMessage.contains("controller.listener.names cannot be empty if the server has the controller role"))
+    assertBadConfigContainingMessage(props, "There must be at least one advertised listener. Perhaps all listeners appear in controller.listener.names?")
+
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+    KafkaConfig.fromProps(props)
 
-    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-    assertTrue(isValidKafkaConfig(props))
+    // confirm that redirecting via listener.security.protocol.map is acceptable
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093")
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def testControllerListenerNameMapsToPlaintextByDefaultForKRaft(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.put(KafkaConfig.NodeIdProp, "1")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+    val controllerListenerName = new ListenerName("CONTROLLER")
+    assertEquals(Some(SecurityProtocol.PLAINTEXT),
+      KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
+    // ensure we don't map it to PLAINTEXT when there is a SSL or SASL controller listener
+    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER,SSL")
+    val controllerNotFoundInMapMessage = "Controller listener with name CONTROLLER defined in controller.listener.names not found in listener.security.protocol.map"
+    assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
+    // ensure we don't map it to PLAINTEXT when there is a SSL or SASL listener
+    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9092")
+    assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
+    props.remove(KafkaConfig.ListenersProp)
+    // ensure we don't map it to PLAINTEXT when it is explicitly mapped otherwise
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    assertEquals(Some(SecurityProtocol.SSL),
+      KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
+    // ensure we don't map it to PLAINTEXT when anything is explicitly given
+    // (i.e. it is only part of the default value, even with KRaft)
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT")
+    assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
+    // ensure we can map it to a non-PLAINTEXT security protocol by default (i.e. when nothing is given)
+    props.remove(KafkaConfig.ListenerSecurityProtocolMapProp)
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+    assertEquals(Some(SecurityProtocol.SSL),
+      KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("SSL")))
+  }
+
+  @Test
+  def testMultipleControllerListenerNamesMapToPlaintextByDefaultForKRaft(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "controller")
+    props.put(KafkaConfig.ListenersProp, "CONTROLLER1://localhost:9092,CONTROLLER2://localhost:9093")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER1,CONTROLLER2")
+    props.put(KafkaConfig.NodeIdProp, "1")
+    props.put(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    assertEquals(Some(SecurityProtocol.PLAINTEXT),
+      KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER1")))
+    assertEquals(Some(SecurityProtocol.PLAINTEXT),
+      KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER2")))
+  }
+
+  @Test
+  def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.BrokerIdProp, "1")
+    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+    props.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
+    assertBadConfigContainingMessage(props,
+      "Error creating broker listeners from 'CONTROLLER://localhost:9092': No security protocol defined for listener CONTROLLER")
+    // Valid now
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
+    assertEquals(None, KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER")))
   }
 
   @Test
@@ -297,13 +438,13 @@ class KafkaConfigTest {
       EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL),
       EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT))
     assertEquals(expectedListeners, config.listeners)
-    assertEquals(expectedListeners, config.advertisedListeners)
+    assertEquals(expectedListeners, config.effectiveAdvertisedListeners)
     val expectedSecurityProtocolMap = Map(
       new ListenerName("CLIENT") -> SecurityProtocol.SSL,
       new ListenerName("REPLICATION") -> SecurityProtocol.SSL,
       new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT
     )
-    assertEquals(expectedSecurityProtocolMap, config.listenerSecurityProtocolMap)
+    assertEquals(expectedSecurityProtocolMap, config.effectiveListenerSecurityProtocolMap)
   }
 
   @Test
@@ -328,13 +469,13 @@ class KafkaConfigTest {
       EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL),
       EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)
     )
-    assertEquals(expectedAdvertisedListeners, config.advertisedListeners)
+    assertEquals(expectedAdvertisedListeners, config.effectiveAdvertisedListeners)
 
     val expectedSecurityProtocolMap = Map(
       new ListenerName("EXTERNAL") -> SecurityProtocol.SSL,
       new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT
     )
-    assertEquals(expectedSecurityProtocolMap, config.listenerSecurityProtocolMap)
+    assertEquals(expectedSecurityProtocolMap, config.effectiveListenerSecurityProtocolMap)
   }
 
   @Test
@@ -382,7 +523,7 @@ class KafkaConfigTest {
     assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString))
   }
 
-  def listenerListToEndPoints(listenerList: String,
+  private def listenerListToEndPoints(listenerList: String,
                               securityProtocolMap: collection.Map[ListenerName, SecurityProtocol] = EndPoint.DefaultSecurityProtocolMap) =
     CoreUtils.listenerListToEndPoints(listenerList, securityProtocolMap)
 
@@ -396,7 +537,7 @@ class KafkaConfigTest {
     val conf = KafkaConfig.fromProps(props)
     assertEquals(listenerListToEndPoints("PLAINTEXT://:9092"), conf.listeners)
     assertNull(conf.listeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get.host)
-    assertEquals(conf.advertisedListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
+    assertEquals(conf.effectiveAdvertisedListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
   }
 
   @nowarn("cat=deprecation")
@@ -540,12 +681,10 @@ class KafkaConfigTest {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
     props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
     props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
-    var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
-    assertTrue(caught.getMessage.contains("No security protocol defined for listener TRACE"))
+    assertBadConfigContainingMessage(props, "No security protocol defined for listener TRACE")
 
     props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
-    caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
-    assertTrue(caught.getMessage.contains("advertised.listeners listener names must be equal to or a subset of the ones defined in listeners"))
+    assertBadConfigContainingMessage(props, "advertised.listeners listener names must be equal to or a subset of the ones defined in listeners")
   }
 
   @nowarn("cat=deprecation")
@@ -951,7 +1090,7 @@ class KafkaConfigTest {
     assertEquals(false, config.brokerIdGenerationEnable)
     assertEquals(1, config.maxReservedBrokerId)
     assertEquals(1, config.brokerId)
-    assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.advertisedListeners.map(_.connectionString))
+    assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedListeners.map(_.connectionString))
     assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
     assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
     assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
@@ -984,7 +1123,7 @@ class KafkaConfigTest {
     props.put(KafkaConfig.MaxConnectionsPerIpProp, "0")
     assertFalse(isValidKafkaConfig(props))
     props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
-    assertTrue(isValidKafkaConfig(props))
+    KafkaConfig.fromProps(props)
     props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
     assertFalse(isValidKafkaConfig(props))
   }
@@ -1000,46 +1139,78 @@ class KafkaConfigTest {
     }
   }
 
-  def assertDistinctControllerAndAdvertisedListeners(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
-    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
-    props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
+  @Test
+  def testDistinctControllerAndAdvertisedListenersAllowedForKRaftBroker(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094")
+    props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093") // explicitly setting it in KRaft
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
+    props.put(KafkaConfig.NodeIdProp, "2")
+    props.put(KafkaConfig.QuorumVotersProp, "3@localhost:9094")
+
+    // invalid due to extra listener also appearing in controller listeners
+    assertBadConfigContainingMessage(props,
+      "controller.listener.names must not contain a value appearing in the 'listeners' configuration when running KRaft with just the broker role")
+
     // Valid now
-    assertTrue(isValidKafkaConfig(props))
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
+    KafkaConfig.fromProps(props)
 
-    // Still valid
-    val controllerListeners = "SASL_SSL"
-    props.put(KafkaConfig.ControllerListenerNamesProp, controllerListeners)
-    assertTrue(isValidKafkaConfig(props))
+    // Also valid if we let advertised listeners be derived from listeners/controller.listener.names
+    // since listeners and advertised.listeners are explicitly identical at this point
+    props.remove(KafkaConfig.AdvertisedListenersProp)
+    KafkaConfig.fromProps(props)
   }
 
   @Test
-  def assertAllControllerListenerCannotBeAdvertised(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+  def testControllerListenersCannotBeAdvertisedForKRaftBroker(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
     props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
+    props.put(KafkaConfig.InterBrokerListenerNameProp, "SASL_SSL")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
+    props.put(KafkaConfig.NodeIdProp, "2")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    assertBadConfigContainingMessage(props,
+      "The advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when process.roles contains the broker role")
+
     // Valid now
-    assertTrue(isValidKafkaConfig(props))
+    props.put(KafkaConfig.AdvertisedListenersProp, "SASL_SSL://C:9094")
+    KafkaConfig.fromProps(props)
 
-    // Invalid now
-    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL,SASL_SSL")
-    assertFalse(isValidKafkaConfig(props))
+    // Also valid if we allow advertised listeners to derive from listeners/controller.listener.names
+    props.remove(KafkaConfig.AdvertisedListenersProp)
+    KafkaConfig.fromProps(props)
   }
 
   @Test
-  def assertEvenOneControllerListenerCannotBeAdvertised(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+  def testAdvertisedListenersDisallowedForKRaftControllerOnlyRole(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "controller")
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
     props.put(KafkaConfig.ListenersProp, listeners)
-    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
-    // Valid now
-    assertTrue(isValidKafkaConfig(props))
+    props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
+    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
+    props.put(KafkaConfig.NodeIdProp, "2")
+    props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    val expectedExceptionContainsTextSuffix = " config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller"
+    assertBadConfigContainingMessage(props, "The advertised.listeners" + expectedExceptionContainsTextSuffix)
+
+    // Still invalid due to extra listener if we set advertised listeners explicitly to be correct
+    val correctListeners = "PLAINTEXT://A:9092,SSL://B:9093"
+    props.put(KafkaConfig.AdvertisedListenersProp, correctListeners)
+    assertBadConfigContainingMessage(props, "The advertised.listeners" + expectedExceptionContainsTextSuffix)
+
+    // Still invalid due to extra listener if we allow advertised listeners to derive from listeners/controller.listener.names
+    props.remove(KafkaConfig.AdvertisedListenersProp)
+    assertBadConfigContainingMessage(props, "The listeners" + expectedExceptionContainsTextSuffix)
 
-    // Invalid now
-    props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-    assertFalse(isValidKafkaConfig(props))
+    // Valid now
+    props.put(KafkaConfig.ListenersProp, correctListeners)
+    KafkaConfig.fromProps(props)
   }
 
   @Test
@@ -1108,9 +1279,11 @@ class KafkaConfigTest {
     val largeBrokerId = 2000
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString)
-    assertTrue(isValidKafkaConfig(props))
+    KafkaConfig.fromProps(props)
   }
 
   @Test
@@ -1159,7 +1332,7 @@ class KafkaConfigTest {
     val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
     props.put(KafkaConfig.ListenersProp, listeners)
     props.put(KafkaConfig.AdvertisedListenersProp, listeners)
-    assertTrue(isValidKafkaConfig(props))
+    KafkaConfig.fromProps(props)
   }
 
   @Test
@@ -1185,7 +1358,7 @@ class KafkaConfigTest {
     props.put(KafkaConfig.ListenersProp, listeners)
     props.put(KafkaConfig.AdvertisedListenersProp, listeners)
     props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
-    assertTrue(isValidKafkaConfig(props))
+    KafkaConfig.fromProps(props)
   }
 
   @Test
@@ -1211,9 +1384,10 @@ class KafkaConfigTest {
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "broker")
     props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     props.put(KafkaConfig.NodeIdProp, "1")
     props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
-    assertTrue(isValidKafkaConfig(props))
+    KafkaConfig.fromProps(props)
   }
 
   @Test
@@ -1223,11 +1397,12 @@ class KafkaConfigTest {
 
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     props.put(KafkaConfig.MetadataLogDirProp, metadataDir)
     props.put(KafkaConfig.LogDirProp, dataDir)
     props.put(KafkaConfig.NodeIdProp, "1")
     props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
-    assertTrue(isValidKafkaConfig(props))
+    KafkaConfig.fromProps(props)
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(metadataDir, config.metadataLogDir)
@@ -1241,10 +1416,11 @@ class KafkaConfigTest {
 
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     props.put(KafkaConfig.LogDirProp, s"$dataDir1,$dataDir2")
     props.put(KafkaConfig.NodeIdProp, "1")
     props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
-    assertTrue(isValidKafkaConfig(props))
+    KafkaConfig.fromProps(props)
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(dataDir1, config.metadataLogDir)
@@ -1299,6 +1475,7 @@ class KafkaConfigTest {
   def testNodeIdIsInferredByBrokerIdWithKraft(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     props.setProperty(KafkaConfig.BrokerIdProp, "3")
     props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
     val config = KafkaConfig.fromProps(props)
@@ -1313,6 +1490,7 @@ class KafkaConfigTest {
   def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
     val props = new Properties()
     props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     props.setProperty(KafkaConfig.NodeIdProp, "3")
     props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9093")
     val config = KafkaConfig.fromProps(props)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 58550bf..82ad542 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -39,9 +39,9 @@ class KafkaRaftServerTest {
     val configProperties = new Properties
     configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
     configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
-    configProperties.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://127.0.0.1:9092")
-    configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9092")
-    configProperties.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+    configProperties.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
+    configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
+    configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
 
     val (loadedMetaProperties, offlineDirs) =
       invokeLoadMetaProperties(metaProperties, configProperties)
@@ -109,6 +109,7 @@ class KafkaRaftServerTest {
     configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
     configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
     configProperties.put(KafkaConfig.LogDirProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
+    configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     val config = KafkaConfig.fromProps(configProperties)
 
     assertThrows(classOf[KafkaException], () => KafkaRaftServer.initializeLogDirs(config))
@@ -131,6 +132,7 @@ class KafkaRaftServerTest {
     configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
     configProperties.put(KafkaConfig.MetadataLogDirProp, invalidDir.getAbsolutePath)
     configProperties.put(KafkaConfig.LogDirProp, validDir.getAbsolutePath)
+    configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     val config = KafkaConfig.fromProps(configProperties)
 
     assertThrows(classOf[KafkaException], () => KafkaRaftServer.initializeLogDirs(config))
@@ -153,6 +155,7 @@ class KafkaRaftServerTest {
     configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
     configProperties.put(KafkaConfig.MetadataLogDirProp, validDir.getAbsolutePath)
     configProperties.put(KafkaConfig.LogDirProp, invalidDir.getAbsolutePath)
+    configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     val config = KafkaConfig.fromProps(configProperties)
 
     val (loadedProperties, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
@@ -182,6 +185,7 @@ class KafkaRaftServerTest {
     configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
     configProperties.put(KafkaConfig.MetadataLogDirProp, metadataDir.getAbsolutePath)
     configProperties.put(KafkaConfig.LogDirProp, dataDir.getAbsolutePath)
+    configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     val config = KafkaConfig.fromProps(configProperties)
 
     assertThrows(classOf[KafkaException], () => KafkaRaftServer.initializeLogDirs(config))
@@ -203,6 +207,7 @@ class KafkaRaftServerTest {
     configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
     configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
     configProperties.put(KafkaConfig.LogDirProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
+    configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     val config = KafkaConfig.fromProps(configProperties)
 
     assertThrows(classOf[InconsistentBrokerMetadataException],
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 08c0acf..f0003f4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -146,6 +146,7 @@ class ReplicaManagerConcurrencyTest {
     props.put(KafkaConfig.QuorumVotersProp, "100@localhost:12345")
     props.put(KafkaConfig.ProcessRolesProp, "broker")
     props.put(KafkaConfig.NodeIdProp, localId.toString)
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     props.put(KafkaConfig.LogDirProp, logDir.getAbsolutePath)
     props.put(KafkaConfig.ReplicaLagTimeMaxMsProp, 5000.toString)
 
diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala b/core/src/test/scala/unit/kafka/server/ServerTest.scala
index 79f8cd9..d72ad2d 100644
--- a/core/src/test/scala/unit/kafka/server/ServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala
@@ -35,7 +35,8 @@ class ServerTest {
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "broker")
     props.put(KafkaConfig.NodeIdProp, nodeId.toString)
-    props.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
+    props.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9093")
+    props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
     val config = KafkaConfig.fromProps(props)
 
     val context = Server.createKafkaMetricsContext(config, clusterId)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 660797a..99d7639 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -48,7 +48,7 @@ import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigException, ConfigResource}
 import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
 import org.apache.kafka.common.errors.{KafkaStorageException, OperationNotAttemptedException, TopicExistsException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.header.Header
@@ -223,7 +223,7 @@ object TestUtils extends Logging {
       brokers: Seq[B],
       protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): String = {
     brokers.map { s =>
-      val listener = s.config.advertisedListeners.find(_.securityProtocol == protocol).getOrElse(
+      val listener = s.config.effectiveAdvertisedListeners.find(_.securityProtocol == protocol).getOrElse(
         sys.error(s"Could not find listener with security protocol $protocol"))
       formatAddress(listener.host, boundPort(s, protocol))
     }.mkString(",")
@@ -231,7 +231,7 @@ object TestUtils extends Logging {
 
   def bootstrapServers[B <: KafkaBroker](brokers: Seq[B], listenerName: ListenerName): String = {
     brokers.map { s =>
-      val listener = s.config.advertisedListeners.find(_.listenerName == listenerName).getOrElse(
+      val listener = s.config.effectiveAdvertisedListeners.find(_.listenerName == listenerName).getOrElse(
         sys.error(s"Could not find listener with name ${listenerName.value}"))
       formatAddress(listener.host, s.boundPort(listenerName))
     }.mkString(",")
@@ -1949,6 +1949,16 @@ object TestUtils extends Logging {
       s" does not contain expected error message : $message"))
   }
 
+  def assertBadConfigContainingMessage(props: Properties, expectedExceptionContainsText: String): Unit = {
+    try {
+      KafkaConfig.fromProps(props)
+      fail("Expected illegal configuration but instead it was legal")
+    } catch {
+      case caught @ (_: ConfigException | _: IllegalArgumentException) =>
+        assertTrue(caught.getMessage.contains(expectedExceptionContainsText))
+    }
+  }
+
   def totalMetricValue(broker: KafkaBroker, metricName: String): Long = {
     totalMetricValue(broker.metrics, metricName)
   }