You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/27 00:59:09 UTC

[kafka] branch 2.8 updated: KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (#10199)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new a61f309  KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (#10199)
a61f309 is described below

commit a61f309bf000aa3c3b0b21c176c09e014f81f13e
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Fri Feb 26 19:56:11 2021 -0500

    KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (#10199)
    
    Fix some cases where we were erroneously using the configuration of the inter broker
    listener instead of the controller listener.  Add the sasl.mechanism.controller.protocol
    configuration key specified by KIP-631.  Add some ducktape tests.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, David Arthur <mu...@gmail.com>, Boyang Chen <bo...@confluent.io>
---
 core/src/main/scala/kafka/raft/RaftManager.scala   | 11 ++-
 .../server/BrokerToControllerChannelManager.scala  | 25 ++++--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  5 ++
 .../kafka/server/BrokerLifecycleManagerTest.scala  |  3 +
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  1 +
 .../sanity_checks/test_verifiable_producer.py      | 73 ++++++++++++++++
 tests/kafkatest/services/kafka/kafka.py            | 67 ++++++++++++---
 .../services/kafka/templates/kafka.properties      |  7 ++
 .../kafkatest/services/security/security_config.py | 52 ++++++++++--
 tests/kafkatest/tests/core/security_test.py        | 96 +++++++++++++++++-----
 10 files changed, 293 insertions(+), 47 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index ecf8934..1881a1d 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -30,10 +30,11 @@ import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
 import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.protocol.ApiMessage
 import org.apache.kafka.common.requests.RequestHeader
 import org.apache.kafka.common.security.JaasContext
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
 import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest, RecordSerde}
@@ -241,12 +242,14 @@ class KafkaRaftManager[T](
   }
 
   private def buildNetworkClient(): NetworkClient = {
+    val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
+    val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
     val channelBuilder = ChannelBuilders.clientChannelBuilder(
-      config.interBrokerSecurityProtocol,
+      controllerSecurityProtocol,
       JaasContext.Type.SERVER,
       config,
-      config.interBrokerListenerName,
-      config.saslMechanismInterBrokerProtocol,
+      controllerListenerName,
+      config.saslMechanismControllerProtocol,
       time,
       config.saslInterBrokerHandshakeRequestEnable,
       logContext
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 621c867..16c4a5b 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -40,6 +40,7 @@ trait ControllerNodeProvider {
   def get(): Option[Node]
   def listenerName: ListenerName
   def securityProtocol: SecurityProtocol
+  def saslMechanism: String
 }
 
 object MetadataCacheControllerNodeProvider {
@@ -56,7 +57,8 @@ object MetadataCacheControllerNodeProvider {
     new MetadataCacheControllerNodeProvider(
       metadataCache,
       listenerName,
-      securityProtocol
+      securityProtocol,
+      config.saslMechanismInterBrokerProtocol
     )
   }
 }
@@ -64,7 +66,8 @@ object MetadataCacheControllerNodeProvider {
 class MetadataCacheControllerNodeProvider(
   val metadataCache: kafka.server.MetadataCache,
   val listenerName: ListenerName,
-  val securityProtocol: SecurityProtocol
+  val securityProtocol: SecurityProtocol,
+  val saslMechanism: String
 ) extends ControllerNodeProvider {
   override def get(): Option[Node] = {
     metadataCache.getControllerId
@@ -78,9 +81,16 @@ object RaftControllerNodeProvider {
             config: KafkaConfig,
             controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {
 
-    val listenerName = new ListenerName(config.controllerListenerNames.head)
-    val securityProtocol = config.listenerSecurityProtocolMap.getOrElse(listenerName, SecurityProtocol.forName(listenerName.value()))
-    new RaftControllerNodeProvider(metaLogManager, controllerQuorumVoterNodes, listenerName, securityProtocol)
+    val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
+    val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
+    val controllerSaslMechanism = config.saslMechanismControllerProtocol
+    new RaftControllerNodeProvider(
+      metaLogManager,
+      controllerQuorumVoterNodes,
+      controllerListenerName,
+      controllerSecurityProtocol,
+      controllerSaslMechanism
+    )
   }
 }
 
@@ -91,7 +101,8 @@ object RaftControllerNodeProvider {
 class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
                                  controllerQuorumVoterNodes: Seq[Node],
                                  val listenerName: ListenerName,
-                                 val securityProtocol: SecurityProtocol
+                                 val securityProtocol: SecurityProtocol,
+                                 val saslMechanism: String
                                 ) extends ControllerNodeProvider with Logging {
   val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap
 
@@ -179,7 +190,7 @@ class BrokerToControllerChannelManagerImpl(
         JaasContext.Type.SERVER,
         config,
         controllerNodeProvider.listenerName,
-        config.saslMechanismInterBrokerProtocol,
+        controllerNodeProvider.saslMechanism,
         time,
         config.saslInterBrokerHandshakeRequestEnable,
         logContext
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d2e3414..39a93aa 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -378,6 +378,7 @@ object KafkaConfig {
   val NodeIdProp = "node.id"
   val MetadataLogDirProp = "metadata.log.dir"
   val ControllerListenerNamesProp = "controller.listener.names"
+  val SaslMechanismControllerProtocolProp = "sasl.mechanism.controller.protocol"
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameProp = "authorizer.class.name"
@@ -675,6 +676,7 @@ object KafkaConfig {
     "KIP-500. If it is not set, the metadata log is placed in the first log directory from log.dirs."
   val ControllerListenerNamesDoc = "A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required " +
     "if this process is a KIP-500 controller. The ZK-based controller will not use this configuration."
+  val SaslMechanismControllerProtocolDoc = "SASL mechanism used for communication with controllers. Default is GSSAPI."
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" +
@@ -1081,6 +1083,7 @@ object KafkaConfig {
       .defineInternal(BrokerSessionTimeoutMsProp, INT, Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
       .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
       .defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH, ControllerListenerNamesDoc)
+      .defineInternal(SaslMechanismControllerProtocolProp, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, SaslMechanismControllerProtocolDoc)
 
       /************* Authorizer Configuration ***********/
       .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
@@ -1814,6 +1817,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   def controllerListeners: Seq[EndPoint] =
     listeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
 
+  def saslMechanismControllerProtocol = getString(KafkaConfig.SaslMechanismControllerProtocolProp)
+
   def controlPlaneListener: Option[EndPoint] = {
     controlPlaneListenerName.map { listenerName =>
       listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 7544a46..d3bcfef 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
 
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.clients.{Metadata, MockClient, NodeApiVersions}
+import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.{Node, Uuid}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
@@ -58,6 +59,8 @@ class BrokerLifecycleManagerTest {
     override def listenerName: ListenerName = new ListenerName("PLAINTEXT")
 
     override def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT;
+
+    override def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
   }
 
   class BrokerLifecycleManagerTestContext(properties: Properties) {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 6271105..987a1fd 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -779,6 +779,7 @@ class KafkaConfigTest {
         case KafkaConfig.SslPrincipalMappingRulesProp => // ignore string
 
         //Sasl Configs
+        case KafkaConfig.SaslMechanismControllerProtocolProp => // ignore
         case KafkaConfig.SaslMechanismInterBrokerProtocolProp => // ignore
         case KafkaConfig.SaslEnabledMechanismsProp =>
         case KafkaConfig.SaslClientCallbackHandlerClassProp =>
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 32961f1..7fcb603 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -96,4 +96,77 @@ class TestVerifiableProducer(Test):
         num_produced = self.producer.num_acked
         assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
 
+    @cluster(num_nodes=4)
+    @matrix(inter_broker_security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=[quorum.remote_raft])
+    @matrix(inter_broker_security_protocol=['SASL_SSL'], inter_broker_sasl_mechanism=['PLAIN', 'GSSAPI'],
+            metadata_quorum=[quorum.remote_raft])
+    def test_multiple_raft_security_protocols(
+            self, inter_broker_security_protocol, inter_broker_sasl_mechanism='GSSAPI', metadata_quorum=quorum.remote_raft):
+        """
+        Test for remote Raft cases that we can start VerifiableProducer on the current branch snapshot version, and
+        verify that we can produce a small number of messages.  The inter-controller and broker-to-controller
+        security protocols are defined to be different (which differs from the above test, where they were the same).
+        """
+        self.kafka.security_protocol = self.kafka.interbroker_security_protocol = inter_broker_security_protocol
+        self.kafka.client_sasl_mechanism = self.kafka.interbroker_sasl_mechanism = inter_broker_sasl_mechanism
+        controller_quorum = self.kafka.controller_quorum
+        sasl_mechanism = 'PLAIN' if inter_broker_sasl_mechanism == 'GSSAPI' else 'GSSAPI'
+        if inter_broker_security_protocol == 'PLAINTEXT':
+            controller_security_protocol = 'SSL'
+            intercontroller_security_protocol = 'SASL_SSL'
+        elif inter_broker_security_protocol == 'SSL':
+            controller_security_protocol = 'SASL_SSL'
+            intercontroller_security_protocol = 'PLAINTEXT'
+        else: # inter_broker_security_protocol == 'SASL_SSL'
+            controller_security_protocol = 'PLAINTEXT'
+            intercontroller_security_protocol = 'SSL'
+        controller_quorum.controller_security_protocol = controller_security_protocol
+        controller_quorum.controller_sasl_mechanism = sasl_mechanism
+        controller_quorum.intercontroller_security_protocol = intercontroller_security_protocol
+        controller_quorum.intercontroller_sasl_mechanism = sasl_mechanism
+        self.kafka.start()
+
+        node = self.producer.nodes[0]
+        node.version = KafkaVersion(str(DEV_BRANCH))
+        self.producer.start()
+        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
+             err_msg="Producer failed to start in a reasonable amount of time.")
+
+        # See above comment above regarding use of version.vstring (distutils.version.LooseVersion)
+        assert is_version(node, [node.version.vstring], logger=self.logger)
+
+        self.producer.wait()
+        num_produced = self.producer.num_acked
+        assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
+
+    @cluster(num_nodes=4)
+    @parametrize(metadata_quorum=quorum.remote_raft)
+    def test_multiple_raft_sasl_mechanisms(self, metadata_quorum):
+        """
+        Test for remote Raft cases that we can start VerifiableProducer on the current branch snapshot version, and
+        verify that we can produce a small number of messages.  The inter-controller and broker-to-controller
+        security protocols are both SASL_PLAINTEXT but the SASL mechanisms are different (we set
+        GSSAPI for the inter-controller mechanism and PLAIN for the broker-to-controller mechanism).
+        This test differs from the above tests -- he ones above used the same SASL mechanism for both paths.
+        """
+        self.kafka.security_protocol = self.kafka.interbroker_security_protocol = 'PLAINTEXT'
+        controller_quorum = self.kafka.controller_quorum
+        controller_quorum.controller_security_protocol = 'SASL_PLAINTEXT'
+        controller_quorum.controller_sasl_mechanism = 'PLAIN'
+        controller_quorum.intercontroller_security_protocol = 'SASL_PLAINTEXT'
+        controller_quorum.intercontroller_sasl_mechanism = 'GSSAPI'
+        self.kafka.start()
+
+        node = self.producer.nodes[0]
+        node.version = KafkaVersion(str(DEV_BRANCH))
+        self.producer.start()
+        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
+             err_msg="Producer failed to start in a reasonable amount of time.")
+
+        # See above comment above regarding use of version.vstring (distutils.version.LooseVersion)
+        assert is_version(node, [node.version.vstring], logger=self.logger)
+
+        self.producer.wait()
+        num_produced = self.producer.num_acked
+        assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
 
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 1b8810c..df8f9d9 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -232,7 +232,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         :param str tls_version: version of the TLS protocol.
         :param str interbroker_security_protocol: security protocol to use for broker-to-broker (and Raft controller-to-controller) communication
         :param str client_sasl_mechanism: sasl mechanism for clients to use
-        :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication
+        :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker (and to-controller) communication
         :param str authorizer_class_name: which authorizer class to use
         :param str version: which kafka version to use. Defaults to "dev" branch
         :param jmx_object_names:
@@ -443,14 +443,55 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     @property
     def security_config(self):
         if not self._security_config:
-            client_sasl_mechanism_to_use = self.client_sasl_mechanism if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.controller_sasl_mechanism
-            interbroker_sasl_mechanism_to_use = self.interbroker_sasl_mechanism if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.intercontroller_sasl_mechanism
-            self._security_config = SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,
+            # we will later change the security protocols to PLAINTEXT if this is a remote Raft controller case since
+            # those security protocols are irrelevant there and we don't want to falsely indicate the use of SASL or TLS
+            security_protocol_to_use=self.security_protocol
+            interbroker_security_protocol_to_use=self.interbroker_security_protocol
+            # determine uses/serves controller sasl mechanisms
+            serves_controller_sasl_mechanism=None
+            serves_intercontroller_sasl_mechanism=None
+            uses_controller_sasl_mechanism=None
+            if self.quorum_info.has_brokers:
+                if self.controller_quorum.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
+                    uses_controller_sasl_mechanism = self.controller_quorum.controller_sasl_mechanism
+            if self.quorum_info.has_controllers:
+                if self.intercontroller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
+                    serves_intercontroller_sasl_mechanism = self.intercontroller_sasl_mechanism
+                    uses_controller_sasl_mechanism = self.intercontroller_sasl_mechanism # won't change from above in co-located case
+                if self.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
+                    serves_controller_sasl_mechanism = self.controller_sasl_mechanism
+            # determine if raft uses TLS
+            raft_tls = False
+            if self.quorum_info.has_brokers and not self.quorum_info.has_controllers:
+                # Raft-based broker only
+                raft_tls = self.controller_quorum.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS
+            if self.quorum_info.has_controllers:
+                # remote or co-located raft controller
+                raft_tls = self.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS \
+                           or self.intercontroller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS
+            # clear irrelevant security protocols of SASL/TLS implications for remote controller quorum case
+            if self.quorum_info.has_controllers and not self.quorum_info.has_brokers:
+                security_protocol_to_use=SecurityConfig.PLAINTEXT
+                interbroker_security_protocol_to_use=SecurityConfig.PLAINTEXT
+
+            self._security_config = SecurityConfig(self.context, security_protocol_to_use, interbroker_security_protocol_to_use,
                                                    zk_sasl=self.zk.zk_sasl if self.quorum_info.using_zk else False, zk_tls=self.zk_client_secure,
-                                                   client_sasl_mechanism=client_sasl_mechanism_to_use,
-                                                   interbroker_sasl_mechanism=interbroker_sasl_mechanism_to_use,
+                                                   client_sasl_mechanism=self.client_sasl_mechanism,
+                                                   interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
                                                    listener_security_config=self.listener_security_config,
-                                                   tls_version=self.tls_version)
+                                                   tls_version=self.tls_version,
+                                                   serves_controller_sasl_mechanism=serves_controller_sasl_mechanism,
+                                                   serves_intercontroller_sasl_mechanism=serves_intercontroller_sasl_mechanism,
+                                                   uses_controller_sasl_mechanism=uses_controller_sasl_mechanism,
+                                                   raft_tls=raft_tls)
+        # Ensure we have the right inter-broker security protocol because it may have been mutated
+        # since we cached our security config (ignore if this is a remote raft controller quorum case; the
+        # inter-broker security protocol is not used there).
+        if (self.quorum_info.using_zk or self.quorum_info.has_brokers) and \
+                self._security_config.interbroker_security_protocol != self.interbroker_security_protocol:
+            self._security_config.interbroker_security_protocol = self.interbroker_security_protocol
+            self._security_config.calc_has_sasl()
+            self._security_config.calc_has_ssl()
         for port in self.port_mappings.values():
             if port.open:
                 self._security_config.enable_security_protocol(port.security_protocol)
@@ -470,9 +511,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.port_mappings[listener_name].open = False
 
     def start_minikdc_if_necessary(self, add_principals=""):
-        has_sasl = self.security_config.has_sasl if self.quorum_info.using_zk else \
-            self.security_config.has_sasl or self.controller_quorum.security_config.has_sasl if self.quorum_info.has_brokers else \
-                self.security_config.has_sasl or self.remote_kafka.security_config.has_sasl
+        has_sasl = self.security_config.has_sasl
         if has_sasl:
             if self.minikdc is None:
                 other_service = self.remote_kafka if self.remote_kafka else self.controller_quorum if self.quorum_info.using_raft else None
@@ -497,7 +536,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             raise Exception("Unable to start Kafka: TLS to Zookeeper requested but Zookeeper secure port not enabled")
         if self.quorum_info.has_brokers_and_controllers and (
                 self.controller_security_protocol != self.intercontroller_security_protocol or
-                self.controller_sasl_mechanism != self.intercontroller_sasl_mechanism):
+                self.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS and self.controller_sasl_mechanism != self.intercontroller_sasl_mechanism):
+            # This is not supported because both the broker and the controller take the first entry from
+            # controller.listener.names and the value from sasl.mechanism.controller.protocol;
+            # they share a single config, so they must both see/use identical values.
             raise Exception("Co-located Raft-based Brokers (%s/%s) and Controllers (%s/%s) cannot talk to Controllers via different security protocols" %
                             (self.controller_security_protocol, self.controller_sasl_mechanism,
                              self.intercontroller_security_protocol, self.intercontroller_sasl_mechanism))
@@ -666,6 +708,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                                                       for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
             # define controller.listener.names
             self.controller_listener_names = ','.join(self.controller_listener_name_list())
+            # define sasl.mechanism.controller.protocol to match remote quorum if one exists
+            if self.remote_controller_quorum:
+                self.controller_sasl_mechanism = self.remote_controller_quorum.controller_sasl_mechanism
 
         prop_file = self.prop_file(node)
         self.logger.info("kafka.properties:")
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index d7fa2d2..f5c9a74 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -95,6 +95,13 @@ zookeeper.ssl.truststore.password=test-ts-passwd
 {% if quorum_info.using_zk or quorum_info.has_brokers %}
 sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }}
 {% endif %}
+{% if quorum_info.using_raft %}
+{% if not quorum_info.has_brokers %}
+sasl.mechanism.controller.protocol={{ intercontroller_sasl_mechanism }}
+{% else %}
+sasl.mechanism.controller.protocol={{ controller_quorum.controller_sasl_mechanism }}
+{% endif %}
+{% endif %}
 sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
 sasl.kerberos.service.name=kafka
 {% if authorizer_class_name is not none %}
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index f68b93d..9ab7e6f 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -120,6 +120,8 @@ class SecurityConfig(TemplateRenderer):
     SSL = 'SSL'
     SASL_PLAINTEXT = 'SASL_PLAINTEXT'
     SASL_SSL = 'SASL_SSL'
+    SASL_SECURITY_PROTOCOLS = [SASL_PLAINTEXT, SASL_SSL]
+    SSL_SECURITY_PROTOCOLS = [SSL, SASL_SSL]
     SASL_MECHANISM_GSSAPI = 'GSSAPI'
     SASL_MECHANISM_PLAIN = 'PLAIN'
     SASL_MECHANISM_SCRAM_SHA_256 = 'SCRAM-SHA-256'
@@ -145,7 +147,11 @@ class SecurityConfig(TemplateRenderer):
     def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
                  client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
                  zk_sasl=False, zk_tls=False, template_props="", static_jaas_conf=True, jaas_override_variables=None,
-                 listener_security_config=ListenerSecurityConfig(), tls_version=None):
+                 listener_security_config=ListenerSecurityConfig(), tls_version=None,
+                 serves_controller_sasl_mechanism=None, # Raft Controller does this
+                 serves_intercontroller_sasl_mechanism=None, # Raft Controller does this
+                 uses_controller_sasl_mechanism=None, # communication to Raft Controller (broker and controller both do this)
+                 raft_tls=False):
         """
         Initialize the security properties for the node and copy
         keystore and truststore to the remote node if the transport protocol 
@@ -173,8 +179,17 @@ class SecurityConfig(TemplateRenderer):
         if interbroker_security_protocol is None:
             interbroker_security_protocol = security_protocol
         self.interbroker_security_protocol = interbroker_security_protocol
-        self.has_sasl = self.is_sasl(security_protocol) or self.is_sasl(interbroker_security_protocol) or zk_sasl
-        self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol) or zk_tls
+        serves_raft_sasl = []
+        if serves_controller_sasl_mechanism is not None:
+            serves_raft_sasl += [serves_controller_sasl_mechanism]
+        if serves_intercontroller_sasl_mechanism is not None:
+            serves_raft_sasl += [serves_intercontroller_sasl_mechanism]
+        self.serves_raft_sasl = set(serves_raft_sasl)
+        uses_raft_sasl = []
+        if uses_controller_sasl_mechanism is not None:
+            uses_raft_sasl += [uses_controller_sasl_mechanism]
+        self.uses_raft_sasl = set(uses_raft_sasl)
+
         self.zk_sasl = zk_sasl
         self.zk_tls = zk_tls
         self.static_jaas_conf = static_jaas_conf
@@ -191,6 +206,7 @@ class SecurityConfig(TemplateRenderer):
             'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
             'sasl.kerberos.service.name' : 'kafka'
         }
+        self.raft_tls = raft_tls
 
         if tls_version is not None:
             self.properties.update({'tls.version' : tls_version})
@@ -198,6 +214,21 @@ class SecurityConfig(TemplateRenderer):
         self.properties.update(self.listener_security_config.client_listener_overrides)
         self.jaas_override_variables = jaas_override_variables or {}
 
+        self.calc_has_sasl()
+        self.calc_has_ssl()
+
+    def calc_has_sasl(self):
+        self.has_sasl = self.is_sasl(self.properties['security.protocol']) \
+                        or self.is_sasl(self.interbroker_security_protocol) \
+                        or self.zk_sasl \
+                        or self.serves_raft_sasl or self.uses_raft_sasl
+
+    def calc_has_ssl(self):
+        self.has_ssl = self.is_ssl(self.properties['security.protocol']) \
+                       or self.is_ssl(self.interbroker_security_protocol) \
+                       or self.zk_tls \
+                       or self.raft_tls
+
     def client_config(self, template_props="", node=None, jaas_override_variables=None,
                       use_inter_broker_mechanism_for_client = False):
         # If node is not specified, use static jaas config which will be created later.
@@ -315,10 +346,10 @@ class SecurityConfig(TemplateRenderer):
         return value
 
     def is_ssl(self, security_protocol):
-        return security_protocol == SecurityConfig.SSL or security_protocol == SecurityConfig.SASL_SSL
+        return security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS
 
     def is_sasl(self, security_protocol):
-        return security_protocol == SecurityConfig.SASL_PLAINTEXT or security_protocol == SecurityConfig.SASL_SSL
+        return security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS
 
     def is_sasl_scram(self, sasl_mechanism):
         return sasl_mechanism == SecurityConfig.SASL_MECHANISM_SCRAM_SHA_256 or sasl_mechanism == SecurityConfig.SASL_MECHANISM_SCRAM_SHA_512
@@ -341,7 +372,16 @@ class SecurityConfig(TemplateRenderer):
 
     @property
     def enabled_sasl_mechanisms(self):
-        return set([self.client_sasl_mechanism, self.interbroker_sasl_mechanism])
+        sasl_mechanisms = []
+        if self.is_sasl(self.security_protocol):
+            sasl_mechanisms += [self.client_sasl_mechanism]
+        if self.is_sasl(self.interbroker_security_protocol):
+            sasl_mechanisms += [self.interbroker_sasl_mechanism]
+        if self.serves_raft_sasl:
+            sasl_mechanisms += list(self.serves_raft_sasl)
+        if self.uses_raft_sasl:
+            sasl_mechanisms += list(self.uses_raft_sasl)
+        return set(sasl_mechanisms)
 
     @property
     def has_sasl_kerberos(self):
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
index 8dcc264..0ce12c9 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -57,7 +57,7 @@ class SecurityTest(EndToEndTest):
 
         return True
 
-    @cluster(num_nodes=7)
+    @cluster(num_nodes=6)
     @matrix(security_protocol=['PLAINTEXT'], interbroker_security_protocol=['SSL'], metadata_quorum=quorum.all_non_upgrade)
     @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], metadata_quorum=quorum.all_non_upgrade)
     def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol, metadata_quorum=quorum.zk):
@@ -78,34 +78,48 @@ class SecurityTest(EndToEndTest):
 
         self.create_kafka(security_protocol=security_protocol,
                           interbroker_security_protocol=interbroker_security_protocol)
+        if self.kafka.quorum_info.using_raft and interbroker_security_protocol == 'SSL':
+            # we don't want to interfere with communication to the controller quorum
+            # (we separately test this below) so make sure it isn't using TLS
+            # (it uses the inter-broker security information by default)
+            controller_quorum = self.kafka.controller_quorum
+            controller_quorum.controller_security_protocol = 'PLAINTEXT'
+            controller_quorum.intercontroller_security_protocol = 'PLAINTEXT'
         self.kafka.start()
 
         # now set the certs to have invalid hostnames so we can run the actual test
         SecurityConfig.ssl_stores.valid_hostname = False
         self.kafka.restart_cluster()
 
-        # We need more verbose logging to catch the expected errors
-        self.create_and_start_clients(log_level="DEBUG")
+        if self.kafka.quorum_info.using_raft and security_protocol == 'PLAINTEXT':
+            # the inter-broker security protocol using TLS with a hostname verification failure
+            # doesn't impact a producer in case of a single broker with a Raft Controller,
+            # so confirm that this is in fact the observed behavior
+            self.create_and_start_clients(log_level="INFO")
+            self.run_validation()
+        else:
+            # We need more verbose logging to catch the expected errors
+            self.create_and_start_clients(log_level="DEBUG")
 
-        try:
-            wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30)
+            try:
+                wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30)
 
-            # Fail quickly if messages are successfully acked
-            raise RuntimeError("Messages published successfully but should not have!"
-                               " Endpoint validation did not fail with invalid hostname")
-        except TimeoutError:
-            # expected
-            pass
+                # Fail quickly if messages are successfully acked
+                raise RuntimeError("Messages published successfully but should not have!"
+                                   " Endpoint validation did not fail with invalid hostname")
+            except TimeoutError:
+                # expected
+                pass
 
-        error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE'
-        wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=30)
-        self.producer.stop()
-        self.consumer.stop()
+            error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE'
+            wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=30)
+            self.producer.stop()
+            self.consumer.stop()
 
-        SecurityConfig.ssl_stores.valid_hostname = True
-        self.kafka.restart_cluster()
-        self.create_and_start_clients(log_level="INFO")
-        self.run_validation()
+            SecurityConfig.ssl_stores.valid_hostname = True
+            self.kafka.restart_cluster()
+            self.create_and_start_clients(log_level="INFO")
+            self.run_validation()
 
     def create_and_start_clients(self, log_level):
         self.create_producer(log_level=log_level)
@@ -113,3 +127,47 @@ class SecurityTest(EndToEndTest):
 
         self.create_consumer(log_level=log_level)
         self.consumer.start()
+
+    @cluster(num_nodes=2)
+    @matrix(metadata_quorum=[quorum.zk, quorum.remote_raft])
+    def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum=quorum.zk):
+        """
+        Test that invalid hostname in ZooKeeper or Raft Controller results in broker inability to start.
+        """
+        # Start ZooKeeper/Raft-based Controller with valid hostnames in the certs' SANs
+        # so that we can start Kafka
+        SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir,
+                                                  valid_hostname=True)
+
+        self.create_zookeeper_if_necessary(num_nodes=1,
+                                           zk_client_port = False,
+                                           zk_client_secure_port = True,
+                                           zk_tls_encrypt_only = True,
+                                           )
+        if self.zk:
+            self.zk.start()
+
+        self.create_kafka(num_nodes=1,
+                          interbroker_security_protocol='SSL', # also sets the broker-to-raft-controller security protocol for the Raft case
+                          zk_client_secure=True, # ignored if we aren't using ZooKeeper
+                          )
+        self.kafka.start()
+
+        # now stop the Kafka broker
+        # and set the cert for ZooKeeper/Raft-based Controller to have an invalid hostname
+        # so we can restart Kafka and ensure it is unable to start
+        self.kafka.stop_node(self.kafka.nodes[0])
+
+        SecurityConfig.ssl_stores.valid_hostname = False
+        if quorum.for_test(self.test_context) == quorum.zk:
+            self.kafka.zk.restart_cluster()
+        else:
+            self.kafka.remote_controller_quorum.restart_cluster()
+
+        try:
+            self.kafka.start_node(self.kafka.nodes[0], timeout_sec=30)
+            raise RuntimeError("Kafka restarted successfully but should not have!"
+                               " Endpoint validation did not fail with invalid hostname")
+        except TimeoutError:
+            # expected
+            pass