You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/01 00:36:59 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11503: KAFKA-13456: Tighten KRaft config checks/constraints

hachikuji commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r759757667



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1960,9 +1962,16 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   }
 
   def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
-    getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+    val mapValue = getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp))
       .map { case (listenerName, protocolName) =>
-      ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+        ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
+      }
+    if (usesSelfManagedQuorum && !originals.containsKey(ListenerSecurityProtocolMapProp)) {
+      // Nothing was specified explicitly, so we are using the default value;
+      // therefore, since we are using KRaft, add the CONTROLLER:PLAINTEXT mapping
+      mapValue ++ Map(new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT)

Review comment:
       Why do we hard-code the "CONTROLLER" listener. Shouldn't we be using the values in `controller.listener.names`?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2015,104 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+
+    // validate KRaft-related configs
+    val voterAddressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+    def validateCanParseControllerQuorumVotersForKRaft(): Unit = {

Review comment:
       nit: The name is inaccurate. The parsing is done on the previous line. Maybe it should be `validateNonEmptyQuorumVoters` or something like that.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2015,104 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+
+    // validate KRaft-related configs
+    val voterAddressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+    def validateCanParseControllerQuorumVotersForKRaft(): Unit = {
+      if (voterAddressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+    }
+    def validateControlPlaneListenerEmptyForKRaft(): Unit = {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.")
+    }
+    def sourceOfAdvertisedListeners: String = {

Review comment:
       nit: seems like this could be a `val`

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2015,104 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+
+    // validate KRaft-related configs
+    val voterAddressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters)
+    def validateCanParseControllerQuorumVotersForKRaft(): Unit = {
+      if (voterAddressSpecsByNodeId.isEmpty) {
+        throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+      }
+    }
+    def validateControlPlaneListenerEmptyForKRaft(): Unit = {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.")
+    }
+    def sourceOfAdvertisedListeners: String = {
+      if (getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+    }
+    def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = {
+      require(!advertisedListenerNames.exists(aln => controllerListenerNames.contains(aln.value())),
+        s"$sourceOfAdvertisedListeners must not contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.")
+    }
+    def validateControllerQuorumVotersMustContainNodeIDForKRaftController(): Unit = {
+      require(voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+    }
+    def validateControllerListenerExistsForKRaftController(): Unit = {
+      require(controllerListeners.nonEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role")
+    }
+    def validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit = {
+      val listenerNameValues = listeners.map(_.listenerName.value).toSet
+      require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)),
+        s"${KafkaConfig.ControllerListenerNamesProp} must only contain values appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role")
+    }
+    def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
+      require(advertisedListenerNames.nonEmpty,
+        "There must be at least one advertised listener." + (
+          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in ${ControllerListenerNamesProp}?" else ""))
+    }
+    if (processRoles == Set(BrokerRole)) {
+      // KRaft broker-only
+      validateCanParseControllerQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+      // nodeId must not appear in controller.quorum.voters
+      require(!voterAddressSpecsByNodeId.containsKey(nodeId),
+        s"If ${KafkaConfig.ProcessRolesProp} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}")
+      // controller.listener.names must be non-empty...
+      require(controllerListenerNames.exists(_.nonEmpty),
+        s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value when running KRaft with just the broker role")
+      // controller.listener.names are forbidden in listeners...
+      require(controllerListeners.isEmpty,
+        s"${KafkaConfig.ControllerListenerNamesProp} must not contain a value appearing in the '${KafkaConfig.ListenersProp}' configuration when running KRaft with just the broker role")
+      // controller.listener.names must all appear in listener.security.protocol.map
+      controllerListenerNames.filter(_.nonEmpty).foreach { name =>
+        val listenerName = ListenerName.normalised(name)
+        if (!listenerSecurityProtocolMap.contains(listenerName)) {
+          throw new ConfigException(s"Controller listener with name ${listenerName.value} defined in " +
+            s"${KafkaConfig.ControllerListenerNamesProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}.")
+        }
+      }
+      // warn that only the first controller listener is used if there is more than one
+      if (controllerListenerNames.size > 1) {
+        warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple entries; only the first will be used since ${KafkaConfig.ProcessRolesProp}=broker: ${controllerListenerNames.asJava.toString}")
+      }
+      validateAdvertisedListenersNonEmptyForBroker()
+    } else if (processRoles == Set(ControllerRole)) {
+      // KRaft controller-only
+      validateCanParseControllerQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      // advertised listeners must be empty when not also running the broker role
+      require(advertisedListeners.isEmpty,
+        s"$sourceOfAdvertisedListeners must only contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp}=controller")
+      validateControllerQuorumVotersMustContainNodeIDForKRaftController()
+      validateControllerListenerExistsForKRaftController()
+      validateControllerListenerNamesMustAppearInListenersForKRaftController()
+    } else if (processRoles == Set(BrokerRole, ControllerRole)) {
+      // KRaft colocated broker and controller
+      validateCanParseControllerQuorumVotersForKRaft()
+      validateControlPlaneListenerEmptyForKRaft()
+      validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker()
+      validateControllerQuorumVotersMustContainNodeIDForKRaftController()
+      validateControllerListenerExistsForKRaftController()
+      validateControllerListenerNamesMustAppearInListenersForKRaftController()
+      validateAdvertisedListenersNonEmptyForBroker()

Review comment:
       Correct me if I'm wrong, but I think the validations here imply that mixed nodes _must_ specify `advertised.listeners`. If the user didn't provide `advertised.listeners`, then we would take the effective advertised listeners from `listeners`, which would have to include the controller listener. In other words, this would be invalid:
   ```
   process.roles=broker,controller
   controller.listener.names=CONTROLLER
   listeners=DEFAULT://:9092,CONTROLLER://:9093
   ```
   
   Instead, we would have to do this:
   ```
   process.roles=broker,controller
   controller.listener.names=CONTROLLER
   listeners=DEFAULT://:9092,CONTROLLER://:9093
   advertised.listeners=DEFAULT://:9092
   ```
   
   Does that seem right? An alternative approach we could use here would be to automatically exclude the controller listeners from the advertised listeners when they are specified by `listeners`. The configuration itself gives us enough information to do the right thing.
   
   By the way, would it make sense to rename `advertisedListeners` to `effectiveAdvertisedListeners` to make it clearer that the source might not be `advertised.listeners`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org