You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/12/10 21:30:46 UTC
[kafka] branch trunk updated: KAFKA-13456; Tighten KRaft listener config checks/constraints (#11503)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff6cf67 KAFKA-13456; Tighten KRaft listener config checks/constraints (#11503)
ff6cf67 is described below
commit ff6cf670f49301a325342b6d7b598a1b462a012c
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Fri Dec 10 16:28:57 2021 -0500
KAFKA-13456; Tighten KRaft listener config checks/constraints (#11503)
This patch tightens the configuration checks related to KRaft configs by adding the following constraints:
* `control.plane.listener.name` is confirmed to be empty in KRaft mode whenever a config object is created as opposed to later when the broker is given the config and tries to start.
* `controller.listener.names` is required to be empty for the non-KRaft (i.e. ZooKeeper) case. A ZooKeeper-based cluster that sets this config will fail to restart until this config is removed.
* There must be no advertised listeners when running just a KRaft controller (i.e. when `process.roles=controller`). This means neither `listeners` nor `advertised.listeners` (if the latter is explicitly defined) can contain a listener that does not also appear in `controller.listener.names`.
* When running a KRaft broker (i.e. when `process.roles=broker` or `process.roles=broker,controller`), advertised listeners (which was already checked to be non-empty via the check that the inter-broker listener appear there) must not include any listeners appearing in `controller.listener.names`.
* When running a KRaft controller (i.e. when `process.roles=controller` or `process.roles=broker,controller`) `controller.listener.names` must be non-empty and every one must appear in `listeners`
* When running just a KRaft broker (i.e. when `process.roles=broker`) `controller.listener.names` must be non-empty and none of them can appear in `listeners`. This was indirectly checked previously, but the indirect checks did not catch all cases.
* When running just a KRaft broker we log a warning if more than one entry appears in `controller.listener.names` because only the first entry is used.
* We also map configured controller listener names to the `PLAINTEXT` security protocol by default provided that the security mapping is empty and no other security protocols are in use.
Reviewers: José Armando García Sancio <js...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
core/src/main/scala/kafka/cluster/EndPoint.scala | 10 +-
core/src/main/scala/kafka/raft/RaftManager.scala | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 6 +-
.../server/BrokerToControllerChannelManager.scala | 2 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 8 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 169 ++++++++---
core/src/main/scala/kafka/server/KafkaServer.scala | 6 +-
.../server/DynamicBrokerReconfigurationTest.scala | 8 +-
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 3 +-
.../test/scala/unit/kafka/KafkaConfigTest.scala | 42 ++-
.../unit/kafka/network/ConnectionQuotasTest.scala | 1 -
.../unit/kafka/network/SocketServerTest.scala | 3 +-
.../scala/unit/kafka/raft/RaftManagerTest.scala | 14 +-
.../kafka/server/BrokerLifecycleManagerTest.scala | 3 +-
.../unit/kafka/server/ControllerApisTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 1 +
.../scala/unit/kafka/server/KafkaConfigTest.scala | 330 ++++++++++++++++-----
.../unit/kafka/server/KafkaRaftServerTest.scala | 11 +-
.../server/ReplicaManagerConcurrencyTest.scala | 1 +
.../test/scala/unit/kafka/server/ServerTest.scala | 3 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 16 +-
21 files changed, 482 insertions(+), 159 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala
index 2f8229a..3e84f9e 100644
--- a/core/src/main/scala/kafka/cluster/EndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -17,11 +17,12 @@
package kafka.cluster
-import org.apache.kafka.common.{Endpoint => JEndpoint, KafkaException}
+import org.apache.kafka.common.{KafkaException, Endpoint => JEndpoint}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
+import java.util.Locale
import scala.collection.Map
object EndPoint {
@@ -57,6 +58,13 @@ object EndPoint {
case _ => throw new KafkaException(s"Unable to parse $connectionString to a broker endpoint")
}
}
+
+ def parseListenerName(connectionString: String): String = {
+ connectionString match {
+ case uriParseExp(listenerNameString, _, _) => listenerNameString.toUpperCase(Locale.ROOT)
+ case _ => throw new KafkaException(s"Unable to parse a listener name from $connectionString")
+ }
+ }
}
/**
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 279e596..4c29250 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -227,7 +227,7 @@ class KafkaRaftManager[T](
private def buildNetworkClient(): NetworkClient = {
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
- val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
+ val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val channelBuilder = ChannelBuilders.clientChannelBuilder(
controllerSecurityProtocol,
JaasContext.Type.SERVER,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 839ee17..0871030 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -324,7 +324,7 @@ class BrokerServer(
metadataSnapshotter)
val networkListeners = new ListenerCollection()
- config.advertisedListeners.foreach { ep =>
+ config.effectiveAdvertisedListeners.foreach { ep =>
networkListeners.add(new Listener().
setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
setName(ep.listenerName.value()).
@@ -404,10 +404,6 @@ class BrokerServer(
config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
SocketServer.DataPlaneThreadPrefix)
- if (socketServer.controlPlaneRequestChannelOpt.isDefined) {
- throw new RuntimeException(KafkaConfig.ControlPlaneListenerNameProp + " is not " +
- "supported when in KRaft mode.")
- }
// Block until we've caught up with the latest metadata from the controller quorum.
lifecycleManager.initialCatchUpFuture.get()
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 132a2cc..b671c70 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -82,7 +82,7 @@ object RaftControllerNodeProvider {
config: KafkaConfig,
controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
- val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
+ val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val controllerSaslMechanism = config.saslMechanismControllerProtocol
new RaftControllerNodeProvider(
raftManager,
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index cf8021c..fef9f19 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -910,12 +910,12 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
throw new ConfigException("Dynamic reconfiguration of listeners is not yet supported when using a Raft-based metadata quorum")
}
val newListeners = listenersToMap(newConfig.listeners)
- val newAdvertisedListeners = listenersToMap(newConfig.advertisedListeners)
+ val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedListeners)
val oldListeners = listenersToMap(oldConfig.listeners)
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
- if (!newListeners.keySet.subsetOf(newConfig.listenerSecurityProtocolMap.keySet))
- throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.listenerSecurityProtocolMap}'")
+ if (!newListeners.keySet.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
+ throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.effectiveListenerSecurityProtocolMap}'")
newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = {
kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case (key, _) =>
@@ -926,7 +926,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
if (immutableListenerConfigs(newConfig, listenerName.configPrefix) != immutableListenerConfigs(oldConfig, listenerName.configPrefix))
throw new ConfigException(s"Configs cannot be updated dynamically for existing listener $listenerName, " +
"restart broker or create a new listener for update")
- if (oldConfig.listenerSecurityProtocolMap(listenerName) != newConfig.listenerSecurityProtocolMap(listenerName))
+ if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName))
throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName")
}
if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a789b24..65bc35e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -27,7 +27,9 @@ import kafka.log.LogConfig
import kafka.log.LogConfig.MessageFormatVersion
import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec}
import kafka.security.authorizer.AuthorizerUtils
+import kafka.server.KafkaConfig.{ControllerListenerNamesProp, ListenerSecurityProtocolMapProp}
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
+import kafka.utils.CoreUtils.parseCsvList
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
@@ -748,7 +750,9 @@ object KafkaConfig {
"Different security (SSL and SASL) settings can be configured for each listener by adding a normalised " +
"prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " +
"INTERNAL listener, a config with name <code>listener.name.internal.ssl.keystore.location</code> would be set. " +
- "If the config for the listener name is not set, the config will fallback to the generic config (i.e. <code>ssl.keystore.location</code>). "
+ "If the config for the listener name is not set, the config will fallback to the generic config (i.e. <code>ssl.keystore.location</code>). " +
+ "Note that in KRaft a default mapping from the listener names defined by <code>controller.listener.names</code> to PLAINTEXT " +
+ "is assumed if no explicit mapping is provided and no other security protocol is in use."
val controlPlaneListenerNameDoc = "Name of listener used for communication between controller and brokers. " +
s"Broker will use the $ControlPlaneListenerNameProp to locate the endpoint in $ListenersProp list, to listen for connections from the controller. " +
"For example, if a broker's config is :\n" +
@@ -1887,15 +1891,21 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
def listeners: Seq[EndPoint] =
- CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp), listenerSecurityProtocolMap)
+ CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp), effectiveListenerSecurityProtocolMap)
- def controllerListenerNames: Seq[String] =
- Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("").split(",")
+ def controllerListenerNames: Seq[String] = {
+ val value = Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("")
+ if (value.isEmpty) {
+ Seq.empty
+ } else {
+ value.split(",")
+ }
+ }
def controllerListeners: Seq[EndPoint] =
listeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
- def saslMechanismControllerProtocol = getString(KafkaConfig.SaslMechanismControllerProtocolProp)
+ def saslMechanismControllerProtocol: String = getString(KafkaConfig.SaslMechanismControllerProtocolProp)
def controlPlaneListener: Option[EndPoint] = {
controlPlaneListenerName.map { listenerName =>
@@ -1912,10 +1922,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
// Use advertised listeners if defined, fallback to listeners otherwise
- def advertisedListeners: Seq[EndPoint] = {
+ def effectiveAdvertisedListeners: Seq[EndPoint] = {
val advertisedListenersProp = getString(KafkaConfig.AdvertisedListenersProp)
if (advertisedListenersProp != null)
- CoreUtils.listenerListToEndPoints(advertisedListenersProp, listenerSecurityProtocolMap, requireDistinctPorts=false)
+ CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
else
listeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value()))
}
@@ -1927,7 +1937,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
case Some(name) =>
val listenerName = ListenerName.normalised(name)
- val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
+ val securityProtocol = effectiveListenerSecurityProtocolMap.getOrElse(listenerName,
throw new ConfigException(s"Listener with name ${listenerName.value} defined in " +
s"${KafkaConfig.InterBrokerListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
(listenerName, securityProtocol)
@@ -1942,7 +1952,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
case Some(name) =>
val listenerName = ListenerName.normalised(name)
- val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
+ val securityProtocol = effectiveListenerSecurityProtocolMap.getOrElse(listenerName,
throw new ConfigException(s"Listener with ${listenerName.value} defined in " +
s"${KafkaConfig.ControlPlaneListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
Some(listenerName, securityProtocol)
@@ -1959,10 +1969,28 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
}
}
- def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
- getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+ def effectiveListenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
+ val mapValue = getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp))
.map { case (listenerName, protocolName) =>
- ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+ ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+ }
+ if (usesSelfManagedQuorum && !originals.containsKey(ListenerSecurityProtocolMapProp)) {
+ // Nothing was specified explicitly for listener.security.protocol.map, so we are using the default value,
+ // and we are using KRaft.
+ // Add PLAINTEXT mappings for controller listeners as long as there is no SSL or SASL_{PLAINTEXT,SSL} in use
+ def isSslOrSasl(name: String): Boolean = name.equals(SecurityProtocol.SSL.name) || name.equals(SecurityProtocol.SASL_SSL.name) || name.equals(SecurityProtocol.SASL_PLAINTEXT.name)
+ // check controller listener names (they won't appear in listeners when process.roles=broker)
+ // as well as listeners for occurrences of SSL or SASL_*
+ if (controllerListenerNames.exists(isSslOrSasl) ||
+ parseCsvList(getString(KafkaConfig.ListenersProp)).exists(listenerValue => isSslOrSasl(EndPoint.parseListenerName(listenerValue)))) {
+ mapValue // don't add default mappings since we found something that is SSL or SASL_*
+ } else {
+ // add the PLAINTEXT mappings for all controller listener names that are not explicitly PLAINTEXT
+ mapValue ++ controllerListenerNames.filter(!SecurityProtocol.PLAINTEXT.name.equals(_)).map(
+ new ListenerName(_) -> SecurityProtocol.PLAINTEXT)
+ }
+ } else {
+ mapValue
}
}
@@ -1992,18 +2020,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which is required " +
s"when `process.roles` is defined (i.e. when running in KRaft mode).")
}
-
- // Validate process.roles with controller.quorum.voters
- val voterIds: Set[Integer] = RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet
- if (voterIds.isEmpty) {
- throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
- } else if (processRoles.contains(ControllerRole)) {
- // Ensure that controllers use their node.id as a voter in controller.quorum.voters
- require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${KafkaConfig.QuorumVotersProp}=$voterIds")
- } else {
- // Ensure that the broker's node.id is not an id in controller.quorum.voters
- require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${KafkaConfig.QuorumVotersProp}=$voterIds")
- }
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
@@ -2018,12 +2034,103 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
"offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
" Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
- require(!processRoles.contains(ControllerRole) || controllerListeners.nonEmpty,
- s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the server has the controller role")
+ val advertisedListenerNames = effectiveAdvertisedListeners.map(_.listenerName).toSet
+
+ // validate KRaft-related configs
+ val voterAddressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+ def validateNonEmptyQuorumVotersForKRaft(): Unit = {
+ if (voterAddressSpecsByNodeId.isEmpty) {
+ throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+ }
+ }
+ def validateControlPlaneListenerEmptyForKRaft(): Unit = {
+ require(controlPlaneListenerName.isEmpty,
+ s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.")
+ }
+ def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = {
+ require(!advertisedListenerNames.exists(aln => controllerListenerNames.contains(aln.value())),
+ s"The advertised.listeners config must not contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.")
+ }
+ def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
+ require(voterAddressSpecsByNodeId.containsKey(nodeId),
+ s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+ }
+ def validateControllerListenerExistsForKRaftController(): Unit = {
+ require(controllerListeners.nonEmpty,
+ s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role")
+ }
+ def validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit = {
+ val listenerNameValues = listeners.map(_.listenerName.value).toSet
+ require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)),
+ s"${KafkaConfig.ControllerListenerNamesProp} must only contain values appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role")
+ }
+ def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
+ require(advertisedListenerNames.nonEmpty,
+ "There must be at least one advertised listener." + (
+ if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in ${ControllerListenerNamesProp}?" else ""))
+ }
+ if (processRoles == Set(BrokerRole)) {
+ // KRaft broker-only
+ validateNonEmptyQuorumVotersForKRaft()
+ validateControlPlaneListenerEmptyForKRaft()
+ validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+ // nodeId must not appear in controller.quorum.voters
+ require(!voterAddressSpecsByNodeId.containsKey(nodeId),
+ s"If ${KafkaConfig.ProcessRolesProp} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+ // controller.listener.names must be non-empty...
+ require(controllerListenerNames.nonEmpty,
+ s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value when running KRaft with just the broker role")
+ // controller.listener.names are forbidden in listeners...
+ require(controllerListeners.isEmpty,
+ s"${KafkaConfig.ControllerListenerNamesProp} must not contain a value appearing in the '${KafkaConfig.ListenersProp}' configuration when running KRaft with just the broker role")
+ // controller.listener.names must all appear in listener.security.protocol.map
+ controllerListenerNames.foreach { name =>
+ val listenerName = ListenerName.normalised(name)
+ if (!effectiveListenerSecurityProtocolMap.contains(listenerName)) {
+ throw new ConfigException(s"Controller listener with name ${listenerName.value} defined in " +
+ s"${KafkaConfig.ControllerListenerNamesProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp} (an explicit security mapping for each controller listener is required if ${KafkaConfig.ListenerSecurityProtocolMapProp} is non-empty, or if there are security protocols other than PLAINTEXT in use)")
+ }
+ }
+ // warn that only the first controller listener is used if there is more than one
+ if (controllerListenerNames.size > 1) {
+ warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple entries; only the first will be used since ${KafkaConfig.ProcessRolesProp}=broker: ${controllerListenerNames.asJava}")
+ }
+ validateAdvertisedListenersNonEmptyForBroker()
+ } else if (processRoles == Set(ControllerRole)) {
+ // KRaft controller-only
+ validateNonEmptyQuorumVotersForKRaft()
+ validateControlPlaneListenerEmptyForKRaft()
+ // advertised listeners must be empty when not also running the broker role
+ val sourceOfAdvertisedListeners: String =
+ if (getString(KafkaConfig.AdvertisedListenersProp) != null)
+ s"${KafkaConfig.AdvertisedListenersProp}"
+ else
+ s"${KafkaConfig.ListenersProp}"
+ require(effectiveAdvertisedListeners.isEmpty,
+ s"The $sourceOfAdvertisedListeners config must only contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp}=controller")
+ validateControllerQuorumVotersMustContainNodeIdForKRaftController()
+ validateControllerListenerExistsForKRaftController()
+ validateControllerListenerNamesMustAppearInListenersForKRaftController()
+ } else if (processRoles == Set(BrokerRole, ControllerRole)) {
+ // KRaft colocated broker and controller
+ validateNonEmptyQuorumVotersForKRaft()
+ validateControlPlaneListenerEmptyForKRaft()
+ validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+ validateControllerQuorumVotersMustContainNodeIdForKRaftController()
+ validateControllerListenerExistsForKRaftController()
+ validateControllerListenerNamesMustAppearInListenersForKRaftController()
+ validateAdvertisedListenersNonEmptyForBroker()
+ } else {
+ // ZK-based
+ // controller listener names must be empty when not in KRaft mode
+ require(controllerListenerNames.isEmpty, s"${KafkaConfig.ControllerListenerNamesProp} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
+ validateAdvertisedListenersNonEmptyForBroker()
+ }
- val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
val listenerNames = listeners.map(_.listenerName).toSet
if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
+ // validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
+ validateAdvertisedListenersNonEmptyForBroker()
require(advertisedListenerNames.contains(interBrokerListenerName),
s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
@@ -2034,15 +2141,11 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
)
}
- require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
+ require(!effectiveAdvertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")
- // Ensure controller listeners are not in the advertised listeners list
- require(!controllerListeners.exists(advertisedListeners.contains),
- s"${KafkaConfig.AdvertisedListenersProp} cannot contain any of ${KafkaConfig.ControllerListenerNamesProp}")
-
- // validate controller.listener.name config
+ // validate control.plane.listener.name config
if (controlPlaneListenerName.isDefined) {
require(advertisedListenerNames.contains(controlPlaneListenerName.get),
s"${KafkaConfig.ControlPlaneListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9f5674e..416895e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -497,14 +497,14 @@ class KafkaServer(
}
def createBrokerInfo: BrokerInfo = {
- val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}")
+ val endPoints = config.effectiveAdvertisedListeners.map(e => s"${e.host}:${e.port}")
zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker =>
val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints)
require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" +
s" advertised listeners are already registered by broker ${broker.id}")
}
- val listeners = config.advertisedListeners.map { endpoint =>
+ val listeners = config.effectiveAdvertisedListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
else
@@ -808,7 +808,7 @@ class KafkaServer(
/** Return advertised listeners with the bound port (this may differ from the configured port if the latter is `0`). */
def advertisedListeners: Seq[EndPoint] = {
- config.advertisedListeners.map { endPoint =>
+ config.effectiveAdvertisedListeners.map { endPoint =>
endPoint.copy(port = boundPort(endPoint.listenerName))
}
}
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 0c0950a..687cd9e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1095,7 +1095,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val listeners = config.listeners
.map(e => s"${e.listenerName.value}://${e.host}:${e.port}")
.mkString(",") + s",$listenerName://localhost:0"
- val listenerMap = config.listenerSecurityProtocolMap
+ val listenerMap = config.effectiveListenerSecurityProtocolMap
.map { case (name, protocol) => s"${name.value}:${protocol.name}" }
.mkString(",") + s",$listenerName:${securityProtocol.name}"
@@ -1174,7 +1174,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
.filter(e => e.listenerName.value != securityProtocol.name)
.map(e => s"${e.listenerName.value}://${e.host}:${e.port}")
.mkString(",")
- val listenerMap = config.listenerSecurityProtocolMap
+ val listenerMap = config.effectiveListenerSecurityProtocolMap
.filter { case (listenerName, _) => listenerName.value != securityProtocol.name }
.map { case (listenerName, protocol) => s"${listenerName.value}:${protocol.name}" }
.mkString(",")
@@ -1364,7 +1364,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
private def alterAdvertisedListener(adminClient: Admin, externalAdminClient: Admin, oldHost: String, newHost: String): Unit = {
val configs = servers.map { server =>
val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
- val newListeners = server.config.advertisedListeners.map { e =>
+ val newListeners = server.config.effectiveAdvertisedListeners.map { e =>
if (e.listenerName.value == SecureExternal)
s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}"
else
@@ -1376,7 +1376,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
adminClient.alterConfigs(configs).all.get
servers.foreach { server =>
TestUtils.retry(10000) {
- val externalListener = server.config.advertisedListeners.find(_.listenerName.value == SecureExternal)
+ val externalListener = server.config.effectiveAdvertisedListeners.find(_.listenerName.value == SecureExternal)
.getOrElse(throw new IllegalStateException("External listener not found"))
assertEquals(newHost, externalListener.host, "Config not updated")
}
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 526c499..94bf453 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -59,8 +59,9 @@ final class KafkaMetadataLogTest {
def testConfig(): Unit = {
val props = new Properties()
props.put(ProcessRolesProp, util.Arrays.asList("broker"))
- props.put(QuorumVotersProp, "1@localhost:9092")
+ props.put(QuorumVotersProp, "1@localhost:9093")
props.put(NodeIdProp, Int.box(2))
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
props.put(MetadataLogSegmentBytesProp, Int.box(10240))
props.put(MetadataLogSegmentMillisProp, Int.box(10 * 1024))
assertThrows(classOf[InvalidConfigurationException], () => {
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index bf056e8..8c8d3e7 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -20,10 +20,9 @@ import java.io.File
import java.nio.file.Files
import java.util
import java.util.Properties
-
import kafka.server.KafkaConfig
import kafka.utils.Exit
-import org.apache.kafka.common.config.ConfigException
+import kafka.utils.TestUtils.assertBadConfigContainingMessage
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.FatalExitError
@@ -89,7 +88,8 @@ class KafkaTest {
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
setListenerProps(propertiesFile)
- assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+ assertBadConfigContainingMessage(propertiesFile,
+ "If process.roles contains just the 'broker' role, the node id 1 must not be included in the set of voters")
// Ensure that with a valid config no exception is thrown
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
@@ -104,7 +104,8 @@ class KafkaTest {
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
setListenerProps(propertiesFile)
- assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+ assertBadConfigContainingMessage(propertiesFile,
+ "If process.roles contains the 'controller' role, the node id 1 must be included in the set of voters")
// Ensure that with a valid config no exception is thrown
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
@@ -119,7 +120,8 @@ class KafkaTest {
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
setListenerProps(propertiesFile)
- assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+ assertBadConfigContainingMessage(propertiesFile,
+ "If process.roles contains the 'controller' role, the node id 1 must be included in the set of voters")
// Ensure that with a valid config no exception is thrown
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
@@ -134,21 +136,39 @@ class KafkaTest {
propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
setListenerProps(propertiesFile)
- assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+ assertBadConfigContainingMessage(propertiesFile,
+ "If using process.roles, controller.quorum.voters must contain a parseable set of voters.")
// Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined
propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "")
- assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+ assertBadConfigContainingMessage(propertiesFile,
+ "Missing required configuration `zookeeper.connect` which has no default value.")
- // Ensure that no exception is thrown once zookeeper.connect is defined
+ // Ensure that no exception is thrown once zookeeper.connect is defined (and we clear controller.listener.names)
propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ propertiesFile.setProperty(KafkaConfig.ControllerListenerNamesProp, "")
KafkaConfig.fromProps(propertiesFile)
}
private def setListenerProps(props: Properties): Unit = {
- props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
- props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
- if (props.getProperty(KafkaConfig.ProcessRolesProp).contains("broker")) {
+ val hasBrokerRole = props.getProperty(KafkaConfig.ProcessRolesProp).contains("broker")
+ val hasControllerRole = props.getProperty(KafkaConfig.ProcessRolesProp).contains("controller")
+ val controllerListener = "SASL_PLAINTEXT://localhost:9092"
+ val brokerListener = "PLAINTEXT://localhost:9093"
+
+ if (hasBrokerRole || hasControllerRole) { // KRaft
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SASL_PLAINTEXT")
+ if (hasBrokerRole && hasControllerRole) {
+ props.setProperty(KafkaConfig.ListenersProp, s"$brokerListener,$controllerListener")
+ } else if (hasControllerRole) {
+ props.setProperty(KafkaConfig.ListenersProp, controllerListener)
+ } else if (hasBrokerRole) {
+ props.setProperty(KafkaConfig.ListenersProp, brokerListener)
+ }
+ } else { // ZK-based
+ props.setProperty(KafkaConfig.ListenersProp, brokerListener)
+ }
+ if (!(hasControllerRole & !hasBrokerRole)) { // not controller-only
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
}
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 10f04d8..2ebcc37 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -395,7 +395,6 @@ class ConnectionQuotasTest {
val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
val props = brokerPropsWithDefaultConnectionLimits
props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
- props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
val config = KafkaConfig.fromProps(props)
connectionQuotas = new ConnectionQuotas(config, time, metrics)
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 27d1f9a..77a5766 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -296,7 +296,7 @@ class SocketServerTest {
val testableServer = new TestableSocketServer(config)
testableServer.startup(startProcessingRequests = false)
- val updatedEndPoints = config.advertisedListeners.map { endpoint =>
+ val updatedEndPoints = config.effectiveAdvertisedListeners.map { endpoint =>
endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
}.map(_.toJava)
@@ -1169,7 +1169,6 @@ class SocketServerTest {
def testClientDisconnectionWithOutstandingReceivesProcessedUntilFailedSend(): Unit = {
val serverMetrics = new Metrics
@volatile var selector: TestableSelector = null
- props.put(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
val overrideServer = new SocketServer(
KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider, apiVersionManager
) {
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 827d42e..a7a9519 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -43,18 +43,18 @@ class RaftManagerTest {
props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
props.setProperty(KafkaConfig.NodeIdProp, nodeId)
- props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
- props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
if (processRoles.contains("broker")) {
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
- props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
- if (!processRoles.contains("controller")) {
+ if (processRoles.contains("controller")) { // co-located
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+ props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
+ } else { // broker-only
val voterId = (nodeId.toInt + 1)
props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
}
- }
-
- if (processRoles.contains("controller")) {
+ } else if (processRoles.contains("controller")) { // controller-only
+ props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
}
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index d97724d..dd3e49d 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -49,6 +49,7 @@ class BrokerLifecycleManagerTest {
properties.setProperty(KafkaConfig.ProcessRolesProp, "broker")
properties.setProperty(KafkaConfig.NodeIdProp, "1")
properties.setProperty(KafkaConfig.QuorumVotersProp, s"2@localhost:9093")
+ properties.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "300000")
properties
}
@@ -80,7 +81,7 @@ class BrokerLifecycleManagerTest {
time, controllerNodeProvider, nodeApiVersions)
val clusterId = "x4AJGXQSRnephtTZzujw4w"
val advertisedListeners = new ListenerCollection()
- config.advertisedListeners.foreach { ep =>
+ config.effectiveAdvertisedListeners.foreach { ep =>
advertisedListeners.add(new Listener().setHost(ep.host).
setName(ep.listenerName.value()).
setPort(ep.port.shortValue()).
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 318e17e..2176d23 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -95,7 +95,7 @@ class ControllerApisTest {
props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
props.put(KafkaConfig.ProcessRolesProp, "controller")
props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
- props.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
+ props.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9092")
new ControllerApis(
requestChannel,
authorizer,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 777857f..98c2666 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -135,6 +135,7 @@ class KafkaApisTest {
properties.put(KafkaConfig.ProcessRolesProp, "broker")
val voterId = (brokerId + 1)
properties.put(KafkaConfig.QuorumVotersProp, s"$voterId@localhost:9093")
+ properties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
properties
} else {
TestUtils.createBrokerConfig(brokerId, "zk")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index bda39e3..f204af2 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -21,6 +21,7 @@ import kafka.api.{ApiVersion, KAFKA_0_8_2, KAFKA_3_0_IV1}
import kafka.cluster.EndPoint
import kafka.log.LogConfig
import kafka.message._
+import kafka.utils.TestUtils.assertBadConfigContainingMessage
import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.config.{ConfigException, TopicConfig}
import org.apache.kafka.common.metrics.Sensor
@@ -155,7 +156,7 @@ class KafkaConfigTest {
props.put(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
val serverConfig = KafkaConfig.fromProps(props)
- val endpoints = serverConfig.advertisedListeners
+ val endpoints = serverConfig.effectiveAdvertisedListeners
assertEquals(1, endpoints.size)
val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, hostName)
@@ -171,7 +172,7 @@ class KafkaConfigTest {
props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
val serverConfig = KafkaConfig.fromProps(props)
- val endpoints = serverConfig.advertisedListeners
+ val endpoints = serverConfig.effectiveAdvertisedListeners
val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, advertisedHostName)
@@ -186,34 +187,31 @@ class KafkaConfigTest {
// listeners with duplicate port
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
- var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
- assertTrue(caught.getMessage.contains("Each listener must have a different port"))
+ assertBadConfigContainingMessage(props, "Each listener must have a different port")
// listeners with duplicate name
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
- caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
- assertTrue(caught.getMessage.contains("Each listener must have a different name"))
+ assertBadConfigContainingMessage(props, "Each listener must have a different name")
// advertised listeners can have duplicate ports
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "HOST:SASL_SSL,LB:SASL_SSL")
props.put(KafkaConfig.InterBrokerListenerNameProp, "HOST")
props.put(KafkaConfig.ListenersProp, "HOST://localhost:9091,LB://localhost:9092")
props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,LB://localhost:9091")
- assertTrue(isValidKafkaConfig(props))
+ KafkaConfig.fromProps(props)
// but not duplicate names
props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,HOST://localhost:9091")
- caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
- assertTrue(caught.getMessage.contains("Each listener must have a different name"))
+ assertBadConfigContainingMessage(props, "Each listener must have a different name")
}
@Test
- def testControlPlaneListenerName() = {
+ def testControlPlaneListenerName(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
props.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
props.put("control.plane.listener.name", "CONTROLLER")
- assertTrue(isValidKafkaConfig(props))
+ KafkaConfig.fromProps(props)
val serverConfig = KafkaConfig.fromProps(props)
val controlEndpoint = serverConfig.controlPlaneListener.get
@@ -222,10 +220,10 @@ class KafkaConfigTest {
assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol)
//advertised listener should contain control-plane listener
- val advertisedEndpoints = serverConfig.advertisedListeners
- assertFalse(advertisedEndpoints.filter { endpoint =>
+ val advertisedEndpoints = serverConfig.effectiveAdvertisedListeners
+ assertTrue(advertisedEndpoints.exists { endpoint =>
endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value())
- }.isEmpty)
+ })
// interBrokerListener name should be different from control-plane listener name
val interBrokerListenerName = serverConfig.interBrokerListenerName
@@ -233,21 +231,16 @@ class KafkaConfigTest {
}
@Test
- def testControllerListenerName() = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000,CONTROLLER://localhost:5000")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLPLANE:SSL,CONTROLLER:SASL_SSL")
- props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000")
- props.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLPLANE")
+ def testControllerListenerNames(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
- assertTrue(isValidKafkaConfig(props))
+ props.put(KafkaConfig.NodeIdProp, "2")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:5000")
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
val serverConfig = KafkaConfig.fromProps(props)
- val controlPlaneEndpoint = serverConfig.controlPlaneListener.get
- assertEquals("localhost", controlPlaneEndpoint.host)
- assertEquals(4000, controlPlaneEndpoint.port)
- assertEquals(SecurityProtocol.SSL, controlPlaneEndpoint.securityProtocol)
-
val controllerEndpoints = serverConfig.controllerListeners
assertEquals(1, controllerEndpoints.size)
val controllerEndpoint = controllerEndpoints.iterator.next()
@@ -257,19 +250,167 @@ class KafkaConfigTest {
}
@Test
- def testControllerListenerDefined(): Unit = {
+ def testControlPlaneListenerNameNotAllowedWithKRaft(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.put(KafkaConfig.NodeIdProp, "2")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.put(KafkaConfig.ControlPlaneListenerNameProp, "SSL")
+
+ assertFalse(isValidKafkaConfig(props))
+ assertBadConfigContainingMessage(props, "control.plane.listener.name is not supported in KRaft mode.")
+
+ props.remove(KafkaConfig.ControlPlaneListenerNameProp)
+ KafkaConfig.fromProps(props)
+ }
+
+ @Test
+ def testControllerListenerDefinedForKRaftController(): Unit = {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "controller")
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+ props.put(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+ props.put(KafkaConfig.NodeIdProp, "2")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+
+ assertBadConfigContainingMessage(props, "The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller")
+
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ KafkaConfig.fromProps(props)
+
+ // confirm that redirecting via listener.security.protocol.map is acceptable
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:SSL")
+ props.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9093")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ KafkaConfig.fromProps(props)
+ }
+
+ @Test
+ def testControllerListenerDefinedForKRaftBroker(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.NodeIdProp, "1")
- props.put(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+
+ assertFalse(isValidKafkaConfig(props))
+ assertBadConfigContainingMessage(props, "controller.listener.names must contain at least one value when running KRaft with just the broker role")
+
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ KafkaConfig.fromProps(props)
+
+ // confirm that redirecting via listener.security.protocol.map is acceptable
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ KafkaConfig.fromProps(props)
+ }
+
+ @Test
+ def testPortInQuorumVotersNotRequiredToMatchFirstControllerListenerPortForThisKRaftController(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "controller,broker")
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094")
+ props.put(KafkaConfig.NodeIdProp, "2")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:9094")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL,SASL_SSL")
+ KafkaConfig.fromProps(props)
+
+ // change each of the 4 ports to port 5555 -- should pass in all circumstances since we can't validate the
+ // controller.quorum.voters ports (which are the ports that clients use and are semantically "advertised" ports
+ // even though the controller configuration doesn't list them in advertised.listeners) against the
+ // listener ports (which are semantically different then the ports that clients use).
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:5555,SASL_SSL://localhost:9094")
+ KafkaConfig.fromProps(props)
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:5555")
+ KafkaConfig.fromProps(props)
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094") // reset to original value
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:5555,3@anotherhost:9094")
+ KafkaConfig.fromProps(props)
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:5555")
+ KafkaConfig.fromProps(props)
+ }
+
+ @Test
+ def testSeparateControllerListenerDefinedForKRaftBrokerController(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+ props.put(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.put(KafkaConfig.NodeIdProp, "2")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertFalse(isValidKafkaConfig(props))
- val caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
- assertTrue(caught.getMessage.contains("controller.listener.names cannot be empty if the server has the controller role"))
+ assertBadConfigContainingMessage(props, "There must be at least one advertised listener. Perhaps all listeners appear in controller.listener.names?")
+
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+ KafkaConfig.fromProps(props)
- props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
- assertTrue(isValidKafkaConfig(props))
+ // confirm that redirecting via listener.security.protocol.map is acceptable
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093")
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ KafkaConfig.fromProps(props)
+ }
+
+ @Test
+ def testControllerListenerNameMapsToPlaintextByDefaultForKRaft(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ props.put(KafkaConfig.NodeIdProp, "1")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ val controllerListenerName = new ListenerName("CONTROLLER")
+ assertEquals(Some(SecurityProtocol.PLAINTEXT),
+ KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
+ // ensure we don't map it to PLAINTEXT when there is a SSL or SASL controller listener
+ props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER,SSL")
+ val controllerNotFoundInMapMessage = "Controller listener with name CONTROLLER defined in controller.listener.names not found in listener.security.protocol.map"
+ assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
+ // ensure we don't map it to PLAINTEXT when there is a SSL or SASL listener
+ props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ props.put(KafkaConfig.ListenersProp, "SSL://localhost:9092")
+ assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
+ props.remove(KafkaConfig.ListenersProp)
+ // ensure we don't map it to PLAINTEXT when it is explicitly mapped otherwise
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+ assertEquals(Some(SecurityProtocol.SSL),
+ KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
+ // ensure we don't map it to PLAINTEXT when anything is explicitly given
+ // (i.e. it is only part of the default value, even with KRaft)
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT")
+ assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
+ // ensure we can map it to a non-PLAINTEXT security protocol by default (i.e. when nothing is given)
+ props.remove(KafkaConfig.ListenerSecurityProtocolMapProp)
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ assertEquals(Some(SecurityProtocol.SSL),
+ KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("SSL")))
+ }
+
+ @Test
+ def testMultipleControllerListenerNamesMapToPlaintextByDefaultForKRaft(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "controller")
+ props.put(KafkaConfig.ListenersProp, "CONTROLLER1://localhost:9092,CONTROLLER2://localhost:9093")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER1,CONTROLLER2")
+ props.put(KafkaConfig.NodeIdProp, "1")
+ props.put(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+ assertEquals(Some(SecurityProtocol.PLAINTEXT),
+ KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER1")))
+ assertEquals(Some(SecurityProtocol.PLAINTEXT),
+ KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER2")))
+ }
+
+ @Test
+ def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.BrokerIdProp, "1")
+ props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
+ assertBadConfigContainingMessage(props,
+ "Error creating broker listeners from 'CONTROLLER://localhost:9092': No security protocol defined for listener CONTROLLER")
+ // Valid now
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
+ assertEquals(None, KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER")))
}
@Test
@@ -297,13 +438,13 @@ class KafkaConfigTest {
EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL),
EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT))
assertEquals(expectedListeners, config.listeners)
- assertEquals(expectedListeners, config.advertisedListeners)
+ assertEquals(expectedListeners, config.effectiveAdvertisedListeners)
val expectedSecurityProtocolMap = Map(
new ListenerName("CLIENT") -> SecurityProtocol.SSL,
new ListenerName("REPLICATION") -> SecurityProtocol.SSL,
new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT
)
- assertEquals(expectedSecurityProtocolMap, config.listenerSecurityProtocolMap)
+ assertEquals(expectedSecurityProtocolMap, config.effectiveListenerSecurityProtocolMap)
}
@Test
@@ -328,13 +469,13 @@ class KafkaConfigTest {
EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL),
EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)
)
- assertEquals(expectedAdvertisedListeners, config.advertisedListeners)
+ assertEquals(expectedAdvertisedListeners, config.effectiveAdvertisedListeners)
val expectedSecurityProtocolMap = Map(
new ListenerName("EXTERNAL") -> SecurityProtocol.SSL,
new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT
)
- assertEquals(expectedSecurityProtocolMap, config.listenerSecurityProtocolMap)
+ assertEquals(expectedSecurityProtocolMap, config.effectiveListenerSecurityProtocolMap)
}
@Test
@@ -382,7 +523,7 @@ class KafkaConfigTest {
assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString))
}
- def listenerListToEndPoints(listenerList: String,
+ private def listenerListToEndPoints(listenerList: String,
securityProtocolMap: collection.Map[ListenerName, SecurityProtocol] = EndPoint.DefaultSecurityProtocolMap) =
CoreUtils.listenerListToEndPoints(listenerList, securityProtocolMap)
@@ -396,7 +537,7 @@ class KafkaConfigTest {
val conf = KafkaConfig.fromProps(props)
assertEquals(listenerListToEndPoints("PLAINTEXT://:9092"), conf.listeners)
assertNull(conf.listeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get.host)
- assertEquals(conf.advertisedListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
+ assertEquals(conf.effectiveAdvertisedListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
}
@nowarn("cat=deprecation")
@@ -540,12 +681,10 @@ class KafkaConfigTest {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
- var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
- assertTrue(caught.getMessage.contains("No security protocol defined for listener TRACE"))
+ assertBadConfigContainingMessage(props, "No security protocol defined for listener TRACE")
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
- caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
- assertTrue(caught.getMessage.contains("advertised.listeners listener names must be equal to or a subset of the ones defined in listeners"))
+ assertBadConfigContainingMessage(props, "advertised.listeners listener names must be equal to or a subset of the ones defined in listeners")
}
@nowarn("cat=deprecation")
@@ -951,7 +1090,7 @@ class KafkaConfigTest {
assertEquals(false, config.brokerIdGenerationEnable)
assertEquals(1, config.maxReservedBrokerId)
assertEquals(1, config.brokerId)
- assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.advertisedListeners.map(_.connectionString))
+ assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedListeners.map(_.connectionString))
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
@@ -984,7 +1123,7 @@ class KafkaConfigTest {
props.put(KafkaConfig.MaxConnectionsPerIpProp, "0")
assertFalse(isValidKafkaConfig(props))
props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
- assertTrue(isValidKafkaConfig(props))
+ KafkaConfig.fromProps(props)
props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
assertFalse(isValidKafkaConfig(props))
}
@@ -1000,46 +1139,78 @@ class KafkaConfigTest {
}
}
- def assertDistinctControllerAndAdvertisedListeners(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
- val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.put(KafkaConfig.ListenersProp, listeners)
- props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
+ @Test
+ def testDistinctControllerAndAdvertisedListenersAllowedForKRaftBroker(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094")
+ props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093") // explicitly setting it in KRaft
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
+ props.put(KafkaConfig.NodeIdProp, "2")
+ props.put(KafkaConfig.QuorumVotersProp, "3@localhost:9094")
+
+ // invalid due to extra listener also appearing in controller listeners
+ assertBadConfigContainingMessage(props,
+ "controller.listener.names must not contain a value appearing in the 'listeners' configuration when running KRaft with just the broker role")
+
// Valid now
- assertTrue(isValidKafkaConfig(props))
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
+ KafkaConfig.fromProps(props)
- // Still valid
- val controllerListeners = "SASL_SSL"
- props.put(KafkaConfig.ControllerListenerNamesProp, controllerListeners)
- assertTrue(isValidKafkaConfig(props))
+ // Also valid if we let advertised listeners be derived from listeners/controller.listener.names
+ // since listeners and advertised.listeners are explicitly identical at this point
+ props.remove(KafkaConfig.AdvertisedListenersProp)
+ KafkaConfig.fromProps(props)
}
@Test
- def assertAllControllerListenerCannotBeAdvertised(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+ def testControllerListenersCannotBeAdvertisedForKRaftBroker(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
- props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+ props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
+ props.put(KafkaConfig.InterBrokerListenerNameProp, "SASL_SSL")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
+ props.put(KafkaConfig.NodeIdProp, "2")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+ assertBadConfigContainingMessage(props,
+ "The advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when process.roles contains the broker role")
+
// Valid now
- assertTrue(isValidKafkaConfig(props))
+ props.put(KafkaConfig.AdvertisedListenersProp, "SASL_SSL://C:9094")
+ KafkaConfig.fromProps(props)
- // Invalid now
- props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL,SASL_SSL")
- assertFalse(isValidKafkaConfig(props))
+ // Also valid if we allow advertised listeners to derive from listeners/controller.listener.names
+ props.remove(KafkaConfig.AdvertisedListenersProp)
+ KafkaConfig.fromProps(props)
}
@Test
- def assertEvenOneControllerListenerCannotBeAdvertised(): Unit = {
- val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+ def testAdvertisedListenersDisallowedForKRaftControllerOnlyRole(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.ProcessRolesProp, "controller")
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
- props.put(KafkaConfig.AdvertisedListenersProp, listeners)
- // Valid now
- assertTrue(isValidKafkaConfig(props))
+ props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
+ props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
+ props.put(KafkaConfig.NodeIdProp, "2")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+ val expectedExceptionContainsTextSuffix = " config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller"
+ assertBadConfigContainingMessage(props, "The advertised.listeners" + expectedExceptionContainsTextSuffix)
+
+ // Still invalid due to extra listener if we set advertised listeners explicitly to be correct
+ val correctListeners = "PLAINTEXT://A:9092,SSL://B:9093"
+ props.put(KafkaConfig.AdvertisedListenersProp, correctListeners)
+ assertBadConfigContainingMessage(props, "The advertised.listeners" + expectedExceptionContainsTextSuffix)
+
+ // Still invalid due to extra listener if we allow advertised listeners to derive from listeners/controller.listener.names
+ props.remove(KafkaConfig.AdvertisedListenersProp)
+ assertBadConfigContainingMessage(props, "The listeners" + expectedExceptionContainsTextSuffix)
- // Invalid now
- props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
- assertFalse(isValidKafkaConfig(props))
+ // Valid now
+ props.put(KafkaConfig.ListenersProp, correctListeners)
+ KafkaConfig.fromProps(props)
}
@Test
@@ -1108,9 +1279,11 @@ class KafkaConfigTest {
val largeBrokerId = 2000
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString)
- assertTrue(isValidKafkaConfig(props))
+ KafkaConfig.fromProps(props)
}
@Test
@@ -1159,7 +1332,7 @@ class KafkaConfigTest {
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.AdvertisedListenersProp, listeners)
- assertTrue(isValidKafkaConfig(props))
+ KafkaConfig.fromProps(props)
}
@Test
@@ -1185,7 +1358,7 @@ class KafkaConfigTest {
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.AdvertisedListenersProp, listeners)
props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
- assertTrue(isValidKafkaConfig(props))
+ KafkaConfig.fromProps(props)
}
@Test
@@ -1211,9 +1384,10 @@ class KafkaConfigTest {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
props.put(KafkaConfig.NodeIdProp, "1")
props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
- assertTrue(isValidKafkaConfig(props))
+ KafkaConfig.fromProps(props)
}
@Test
@@ -1223,11 +1397,12 @@ class KafkaConfigTest {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
props.put(KafkaConfig.MetadataLogDirProp, metadataDir)
props.put(KafkaConfig.LogDirProp, dataDir)
props.put(KafkaConfig.NodeIdProp, "1")
props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
- assertTrue(isValidKafkaConfig(props))
+ KafkaConfig.fromProps(props)
val config = KafkaConfig.fromProps(props)
assertEquals(metadataDir, config.metadataLogDir)
@@ -1241,10 +1416,11 @@ class KafkaConfigTest {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
props.put(KafkaConfig.LogDirProp, s"$dataDir1,$dataDir2")
props.put(KafkaConfig.NodeIdProp, "1")
props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
- assertTrue(isValidKafkaConfig(props))
+ KafkaConfig.fromProps(props)
val config = KafkaConfig.fromProps(props)
assertEquals(dataDir1, config.metadataLogDir)
@@ -1299,6 +1475,7 @@ class KafkaConfigTest {
def testNodeIdIsInferredByBrokerIdWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
props.setProperty(KafkaConfig.BrokerIdProp, "3")
props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
val config = KafkaConfig.fromProps(props)
@@ -1313,6 +1490,7 @@ class KafkaConfigTest {
def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
props.setProperty(KafkaConfig.NodeIdProp, "3")
props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9093")
val config = KafkaConfig.fromProps(props)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 58550bf..82ad542 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -39,9 +39,9 @@ class KafkaRaftServerTest {
val configProperties = new Properties
configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
- configProperties.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://127.0.0.1:9092")
- configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9092")
- configProperties.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+ configProperties.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
+ configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
+ configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
val (loadedMetaProperties, offlineDirs) =
invokeLoadMetaProperties(metaProperties, configProperties)
@@ -109,6 +109,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
configProperties.put(KafkaConfig.LogDirProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
+ configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
val config = KafkaConfig.fromProps(configProperties)
assertThrows(classOf[KafkaException], () => KafkaRaftServer.initializeLogDirs(config))
@@ -131,6 +132,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
configProperties.put(KafkaConfig.MetadataLogDirProp, invalidDir.getAbsolutePath)
configProperties.put(KafkaConfig.LogDirProp, validDir.getAbsolutePath)
+ configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
val config = KafkaConfig.fromProps(configProperties)
assertThrows(classOf[KafkaException], () => KafkaRaftServer.initializeLogDirs(config))
@@ -153,6 +155,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
configProperties.put(KafkaConfig.MetadataLogDirProp, validDir.getAbsolutePath)
configProperties.put(KafkaConfig.LogDirProp, invalidDir.getAbsolutePath)
+ configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
val config = KafkaConfig.fromProps(configProperties)
val (loadedProperties, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
@@ -182,6 +185,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
configProperties.put(KafkaConfig.MetadataLogDirProp, metadataDir.getAbsolutePath)
configProperties.put(KafkaConfig.LogDirProp, dataDir.getAbsolutePath)
+ configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
val config = KafkaConfig.fromProps(configProperties)
assertThrows(classOf[KafkaException], () => KafkaRaftServer.initializeLogDirs(config))
@@ -203,6 +207,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
configProperties.put(KafkaConfig.LogDirProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
+ configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
val config = KafkaConfig.fromProps(configProperties)
assertThrows(classOf[InconsistentBrokerMetadataException],
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 08c0acf..f0003f4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -146,6 +146,7 @@ class ReplicaManagerConcurrencyTest {
props.put(KafkaConfig.QuorumVotersProp, "100@localhost:12345")
props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.NodeIdProp, localId.toString)
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
props.put(KafkaConfig.LogDirProp, logDir.getAbsolutePath)
props.put(KafkaConfig.ReplicaLagTimeMaxMsProp, 5000.toString)
diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala b/core/src/test/scala/unit/kafka/server/ServerTest.scala
index 79f8cd9..d72ad2d 100644
--- a/core/src/test/scala/unit/kafka/server/ServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala
@@ -35,7 +35,8 @@ class ServerTest {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.NodeIdProp, nodeId.toString)
- props.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
+ props.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9093")
+ props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
val config = KafkaConfig.fromProps(props)
val context = Server.createKafkaMetricsContext(config, clusterId)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 660797a..99d7639 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -48,7 +48,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
import org.apache.kafka.common.errors.{KafkaStorageException, OperationNotAttemptedException, TopicExistsException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.header.Header
@@ -223,7 +223,7 @@ object TestUtils extends Logging {
brokers: Seq[B],
protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): String = {
brokers.map { s =>
- val listener = s.config.advertisedListeners.find(_.securityProtocol == protocol).getOrElse(
+ val listener = s.config.effectiveAdvertisedListeners.find(_.securityProtocol == protocol).getOrElse(
sys.error(s"Could not find listener with security protocol $protocol"))
formatAddress(listener.host, boundPort(s, protocol))
}.mkString(",")
@@ -231,7 +231,7 @@ object TestUtils extends Logging {
def bootstrapServers[B <: KafkaBroker](brokers: Seq[B], listenerName: ListenerName): String = {
brokers.map { s =>
- val listener = s.config.advertisedListeners.find(_.listenerName == listenerName).getOrElse(
+ val listener = s.config.effectiveAdvertisedListeners.find(_.listenerName == listenerName).getOrElse(
sys.error(s"Could not find listener with name ${listenerName.value}"))
formatAddress(listener.host, s.boundPort(listenerName))
}.mkString(",")
@@ -1949,6 +1949,16 @@ object TestUtils extends Logging {
s" does not contain expected error message : $message"))
}
+ def assertBadConfigContainingMessage(props: Properties, expectedExceptionContainsText: String): Unit = {
+ try {
+ KafkaConfig.fromProps(props)
+ fail("Expected illegal configuration but instead it was legal")
+ } catch {
+ case caught @ (_: ConfigException | _: IllegalArgumentException) =>
+ assertTrue(caught.getMessage.contains(expectedExceptionContainsText))
+ }
+ }
+
def totalMetricValue(broker: KafkaBroker, metricName: String): Long = {
totalMetricValue(broker.metrics, metricName)
}