You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/22 15:51:54 UTC

[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: Tighten KRaft config checks/constraints

rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r754409736



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -2018,12 +2019,68 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
       "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." +
       " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(!processRoles.contains(ControllerRole) || controllerListeners.nonEmpty,
-      s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the server has the controller role")
-
     val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    if (usesSelfManagedQuorum) {
+      require(controlPlaneListenerName.isEmpty,
+        s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode.")
+      val sourceOfAdvertisedListeners = if (getString(KafkaConfig.AdvertisedListenersProp) != null)
+        s"${KafkaConfig.AdvertisedListenersProp}"
+      else
+        s"${KafkaConfig.ListenersProp}"
+      if (!processRoles.contains(BrokerRole)) {
+        // advertised listeners must be empty when not also running the broker role
+        require(advertisedListeners.isEmpty,
+          sourceOfAdvertisedListeners +
+            s" must only contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp}=controller")
+      } else {
+        // when running broker role advertised listeners cannot contain controller listeners
+        require(!advertisedListenerNames.exists(aln => controllerListenerNames.contains(aln.value())),
+          sourceOfAdvertisedListeners +
+            s" must not contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} contains the broker role")
+      }
+      if (processRoles.contains(ControllerRole)) {
+        // has controller role (and optionally broker role as well)
+        // controller.listener.names must be non-empty
+        // every one must appear in listeners
+        // the port appearing in controller.quorum.voters for this node must match the port of the first controller listener
+        // (we allow other nodes' voter ports to differ to support running multiple controllers on the same host)
+        require(controllerListeners.nonEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role")
+        val listenerNameValues = listeners.map(_.listenerName.value).toSet
+        require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)),
+          s"${KafkaConfig.ControllerListenerNamesProp} must only contain values appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role")
+        val addressSpecForThisNode = RaftConfig.parseVoterConnections(quorumVoters).get(nodeId)
+        addressSpecForThisNode match {
+          case inetAddressSpec: RaftConfig.InetAddressSpec => {
+            val quorumVotersPort = inetAddressSpec.address.getPort
+            require(controllerListeners.head.port == quorumVotersPort,
+              s"Port in ${KafkaConfig.QuorumVotersProp} for this controller node (${KafkaConfig.NodeIdProp}=$nodeId, port=$quorumVotersPort) does not match the port for the first controller listener in ${KafkaConfig.ControllerListenerNamesProp} (${controllerListeners.head.listenerName.value()}, port=${controllerListeners.head.port})")
+          }
+          case _ =>
+        }
+      } else {
+        // only broker role
+        // controller.listener.names must be non-empty
+        // none of them can appear in listeners
+        // warn that only the first one is used if there is more than one
+        require(controllerListenerNames.exists(_.nonEmpty),
+          s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value when running KRaft with just the broker role")
+        if (controllerListenerNames.size > 1) {
+          warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple entries; only the first will be used since ${KafkaConfig.ProcessRolesProp}=broker: $controllerListenerNames")
+        }
+        require(controllerListeners.isEmpty,
+          s"${KafkaConfig.ControllerListenerNamesProp} must not contain a value appearing in the '${KafkaConfig.ListenersProp}' configuration when running KRaft with just the broker role")

Review comment:
       > Are we already checking this...
   
   This check applies to the KRaft broker-only setup and confirms that `controller.listener.names` doesn't appear in `listeners`.
   
   The check you referred to applies to both KRaft broker setups (i.e. broker-only and co-located) and confirms that `controller.listener.names` doesn't appear in `advertised.listeners`. 
   
   Every entry in `controller.listener.names` must appear in `listeners` for the KRaft controller setups (i.e. controller-only and co-located); we check this elsewhere.  The `advertised.listeners` config must be empty for these setups, which we also check elsewhere.  Additionally, for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located), the `advertised.listeners` config must be non-empty, it must contain the inter-broker listener, and it must be a subset of `listeners`.
   
   I've structured the code around the setups we have to check (and will push a commit shortly to add comments that make this clear).  So we have these sections:
   
   ```
   // validations for all 3 KRaft setups (co-located, controller-only, broker-only)
   ...
   // validations for KRaft controller-only setup
   ...
   // validations for both KRaft broker setup (i.e. broker-only and co-located)
   ...
   // validations for both KRaft controller setups (i.e. controller-only and co-located)
   ...
   // validations for KRaft broker-only setup
   ...
   // validations for ZooKeeper (i.e. non-KRaft) setup
   ...
   // validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
   ```
   
   The other way to do it would be to check the 7 different possibilities for each config key/value, but I suspect that wouldn't read as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org