You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/27 00:59:09 UTC
[kafka] branch 2.8 updated: KAFKA-12374: Add missing config
sasl.mechanism.controller.protocol (#10199)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new a61f309 KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (#10199)
a61f309 is described below
commit a61f309bf000aa3c3b0b21c176c09e014f81f13e
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Fri Feb 26 19:56:11 2021 -0500
KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (#10199)
Fix some cases where we were erroneously using the configuration of the inter broker
listener instead of the controller listener. Add the sasl.mechanism.controller.protocol
configuration key specified by KIP-631. Add some ducktape tests.
Reviewers: Colin P. McCabe <cm...@apache.org>, David Arthur <mu...@gmail.com>, Boyang Chen <bo...@confluent.io>
---
core/src/main/scala/kafka/raft/RaftManager.scala | 11 ++-
.../server/BrokerToControllerChannelManager.scala | 25 ++++--
core/src/main/scala/kafka/server/KafkaConfig.scala | 5 ++
.../kafka/server/BrokerLifecycleManagerTest.scala | 3 +
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 +
.../sanity_checks/test_verifiable_producer.py | 73 ++++++++++++++++
tests/kafkatest/services/kafka/kafka.py | 67 ++++++++++++---
.../services/kafka/templates/kafka.properties | 7 ++
.../kafkatest/services/security/security_config.py | 52 ++++++++++--
tests/kafkatest/tests/core/security_test.py | 96 +++++++++++++++++-----
10 files changed, 293 insertions(+), 47 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index ecf8934..1881a1d 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -30,10 +30,11 @@ import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
+import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest, RecordSerde}
@@ -241,12 +242,14 @@ class KafkaRaftManager[T](
}
private def buildNetworkClient(): NetworkClient = {
+ val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
+ val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val channelBuilder = ChannelBuilders.clientChannelBuilder(
- config.interBrokerSecurityProtocol,
+ controllerSecurityProtocol,
JaasContext.Type.SERVER,
config,
- config.interBrokerListenerName,
- config.saslMechanismInterBrokerProtocol,
+ controllerListenerName,
+ config.saslMechanismControllerProtocol,
time,
config.saslInterBrokerHandshakeRequestEnable,
logContext
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 621c867..16c4a5b 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -40,6 +40,7 @@ trait ControllerNodeProvider {
def get(): Option[Node]
def listenerName: ListenerName
def securityProtocol: SecurityProtocol
+ def saslMechanism: String
}
object MetadataCacheControllerNodeProvider {
@@ -56,7 +57,8 @@ object MetadataCacheControllerNodeProvider {
new MetadataCacheControllerNodeProvider(
metadataCache,
listenerName,
- securityProtocol
+ securityProtocol,
+ config.saslMechanismInterBrokerProtocol
)
}
}
@@ -64,7 +66,8 @@ object MetadataCacheControllerNodeProvider {
class MetadataCacheControllerNodeProvider(
val metadataCache: kafka.server.MetadataCache,
val listenerName: ListenerName,
- val securityProtocol: SecurityProtocol
+ val securityProtocol: SecurityProtocol,
+ val saslMechanism: String
) extends ControllerNodeProvider {
override def get(): Option[Node] = {
metadataCache.getControllerId
@@ -78,9 +81,16 @@ object RaftControllerNodeProvider {
config: KafkaConfig,
controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {
- val listenerName = new ListenerName(config.controllerListenerNames.head)
- val securityProtocol = config.listenerSecurityProtocolMap.getOrElse(listenerName, SecurityProtocol.forName(listenerName.value()))
- new RaftControllerNodeProvider(metaLogManager, controllerQuorumVoterNodes, listenerName, securityProtocol)
+ val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
+ val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
+ val controllerSaslMechanism = config.saslMechanismControllerProtocol
+ new RaftControllerNodeProvider(
+ metaLogManager,
+ controllerQuorumVoterNodes,
+ controllerListenerName,
+ controllerSecurityProtocol,
+ controllerSaslMechanism
+ )
}
}
@@ -91,7 +101,8 @@ object RaftControllerNodeProvider {
class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
controllerQuorumVoterNodes: Seq[Node],
val listenerName: ListenerName,
- val securityProtocol: SecurityProtocol
+ val securityProtocol: SecurityProtocol,
+ val saslMechanism: String
) extends ControllerNodeProvider with Logging {
val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap
@@ -179,7 +190,7 @@ class BrokerToControllerChannelManagerImpl(
JaasContext.Type.SERVER,
config,
controllerNodeProvider.listenerName,
- config.saslMechanismInterBrokerProtocol,
+ controllerNodeProvider.saslMechanism,
time,
config.saslInterBrokerHandshakeRequestEnable,
logContext
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d2e3414..39a93aa 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -378,6 +378,7 @@ object KafkaConfig {
val NodeIdProp = "node.id"
val MetadataLogDirProp = "metadata.log.dir"
val ControllerListenerNamesProp = "controller.listener.names"
+ val SaslMechanismControllerProtocolProp = "sasl.mechanism.controller.protocol"
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
@@ -675,6 +676,7 @@ object KafkaConfig {
"KIP-500. If it is not set, the metadata log is placed in the first log directory from log.dirs."
val ControllerListenerNamesDoc = "A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required " +
"if this process is a KIP-500 controller. The ZK-based controller will not use this configuration."
+ val SaslMechanismControllerProtocolDoc = "SASL mechanism used for communication with controllers. Default is GSSAPI."
/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" +
@@ -1081,6 +1083,7 @@ object KafkaConfig {
.defineInternal(BrokerSessionTimeoutMsProp, INT, Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
.defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
.defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH, ControllerListenerNamesDoc)
+ .defineInternal(SaslMechanismControllerProtocolProp, STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, null, HIGH, SaslMechanismControllerProtocolDoc)
/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
@@ -1814,6 +1817,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def controllerListeners: Seq[EndPoint] =
listeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))
+ def saslMechanismControllerProtocol = getString(KafkaConfig.SaslMechanismControllerProtocolProp)
+
def controlPlaneListener: Option[EndPoint] = {
controlPlaneListenerName.map { listenerName =>
listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 7544a46..d3bcfef 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.clients.{Metadata, MockClient, NodeApiVersions}
+import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.{Node, Uuid}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
@@ -58,6 +59,8 @@ class BrokerLifecycleManagerTest {
override def listenerName: ListenerName = new ListenerName("PLAINTEXT")
override def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT;
+
+ override def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
}
class BrokerLifecycleManagerTestContext(properties: Properties) {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 6271105..987a1fd 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -779,6 +779,7 @@ class KafkaConfigTest {
case KafkaConfig.SslPrincipalMappingRulesProp => // ignore string
//Sasl Configs
+ case KafkaConfig.SaslMechanismControllerProtocolProp => // ignore
case KafkaConfig.SaslMechanismInterBrokerProtocolProp => // ignore
case KafkaConfig.SaslEnabledMechanismsProp =>
case KafkaConfig.SaslClientCallbackHandlerClassProp =>
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 32961f1..7fcb603 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -96,4 +96,77 @@ class TestVerifiableProducer(Test):
num_produced = self.producer.num_acked
assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
+ @cluster(num_nodes=4)
+ @matrix(inter_broker_security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=[quorum.remote_raft])
+ @matrix(inter_broker_security_protocol=['SASL_SSL'], inter_broker_sasl_mechanism=['PLAIN', 'GSSAPI'],
+ metadata_quorum=[quorum.remote_raft])
+ def test_multiple_raft_security_protocols(
+ self, inter_broker_security_protocol, inter_broker_sasl_mechanism='GSSAPI', metadata_quorum=quorum.remote_raft):
+ """
+ Test for remote Raft cases that we can start VerifiableProducer on the current branch snapshot version, and
+ verify that we can produce a small number of messages. The inter-controller and broker-to-controller
+ security protocols are defined to be different (which differs from the above test, where they were the same).
+ """
+ self.kafka.security_protocol = self.kafka.interbroker_security_protocol = inter_broker_security_protocol
+ self.kafka.client_sasl_mechanism = self.kafka.interbroker_sasl_mechanism = inter_broker_sasl_mechanism
+ controller_quorum = self.kafka.controller_quorum
+ sasl_mechanism = 'PLAIN' if inter_broker_sasl_mechanism == 'GSSAPI' else 'GSSAPI'
+ if inter_broker_security_protocol == 'PLAINTEXT':
+ controller_security_protocol = 'SSL'
+ intercontroller_security_protocol = 'SASL_SSL'
+ elif inter_broker_security_protocol == 'SSL':
+ controller_security_protocol = 'SASL_SSL'
+ intercontroller_security_protocol = 'PLAINTEXT'
+ else: # inter_broker_security_protocol == 'SASL_SSL'
+ controller_security_protocol = 'PLAINTEXT'
+ intercontroller_security_protocol = 'SSL'
+ controller_quorum.controller_security_protocol = controller_security_protocol
+ controller_quorum.controller_sasl_mechanism = sasl_mechanism
+ controller_quorum.intercontroller_security_protocol = intercontroller_security_protocol
+ controller_quorum.intercontroller_sasl_mechanism = sasl_mechanism
+ self.kafka.start()
+
+ node = self.producer.nodes[0]
+ node.version = KafkaVersion(str(DEV_BRANCH))
+ self.producer.start()
+ wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
+ err_msg="Producer failed to start in a reasonable amount of time.")
+
+ # See above comment above regarding use of version.vstring (distutils.version.LooseVersion)
+ assert is_version(node, [node.version.vstring], logger=self.logger)
+
+ self.producer.wait()
+ num_produced = self.producer.num_acked
+ assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
+
+ @cluster(num_nodes=4)
+ @parametrize(metadata_quorum=quorum.remote_raft)
+ def test_multiple_raft_sasl_mechanisms(self, metadata_quorum):
+ """
+ Test for remote Raft cases that we can start VerifiableProducer on the current branch snapshot version, and
+ verify that we can produce a small number of messages. The inter-controller and broker-to-controller
+ security protocols are both SASL_PLAINTEXT but the SASL mechanisms are different (we set
+ GSSAPI for the inter-controller mechanism and PLAIN for the broker-to-controller mechanism).
+ This test differs from the above tests -- he ones above used the same SASL mechanism for both paths.
+ """
+ self.kafka.security_protocol = self.kafka.interbroker_security_protocol = 'PLAINTEXT'
+ controller_quorum = self.kafka.controller_quorum
+ controller_quorum.controller_security_protocol = 'SASL_PLAINTEXT'
+ controller_quorum.controller_sasl_mechanism = 'PLAIN'
+ controller_quorum.intercontroller_security_protocol = 'SASL_PLAINTEXT'
+ controller_quorum.intercontroller_sasl_mechanism = 'GSSAPI'
+ self.kafka.start()
+
+ node = self.producer.nodes[0]
+ node.version = KafkaVersion(str(DEV_BRANCH))
+ self.producer.start()
+ wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
+ err_msg="Producer failed to start in a reasonable amount of time.")
+
+ # See above comment above regarding use of version.vstring (distutils.version.LooseVersion)
+ assert is_version(node, [node.version.vstring], logger=self.logger)
+
+ self.producer.wait()
+ num_produced = self.producer.num_acked
+ assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 1b8810c..df8f9d9 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -232,7 +232,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
:param str tls_version: version of the TLS protocol.
:param str interbroker_security_protocol: security protocol to use for broker-to-broker (and Raft controller-to-controller) communication
:param str client_sasl_mechanism: sasl mechanism for clients to use
- :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication
+ :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker (and to-controller) communication
:param str authorizer_class_name: which authorizer class to use
:param str version: which kafka version to use. Defaults to "dev" branch
:param jmx_object_names:
@@ -443,14 +443,55 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
@property
def security_config(self):
if not self._security_config:
- client_sasl_mechanism_to_use = self.client_sasl_mechanism if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.controller_sasl_mechanism
- interbroker_sasl_mechanism_to_use = self.interbroker_sasl_mechanism if self.quorum_info.using_zk or self.quorum_info.has_brokers else self.intercontroller_sasl_mechanism
- self._security_config = SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,
+ # we will later change the security protocols to PLAINTEXT if this is a remote Raft controller case since
+ # those security protocols are irrelevant there and we don't want to falsely indicate the use of SASL or TLS
+ security_protocol_to_use=self.security_protocol
+ interbroker_security_protocol_to_use=self.interbroker_security_protocol
+ # determine uses/serves controller sasl mechanisms
+ serves_controller_sasl_mechanism=None
+ serves_intercontroller_sasl_mechanism=None
+ uses_controller_sasl_mechanism=None
+ if self.quorum_info.has_brokers:
+ if self.controller_quorum.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
+ uses_controller_sasl_mechanism = self.controller_quorum.controller_sasl_mechanism
+ if self.quorum_info.has_controllers:
+ if self.intercontroller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
+ serves_intercontroller_sasl_mechanism = self.intercontroller_sasl_mechanism
+ uses_controller_sasl_mechanism = self.intercontroller_sasl_mechanism # won't change from above in co-located case
+ if self.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS:
+ serves_controller_sasl_mechanism = self.controller_sasl_mechanism
+ # determine if raft uses TLS
+ raft_tls = False
+ if self.quorum_info.has_brokers and not self.quorum_info.has_controllers:
+ # Raft-based broker only
+ raft_tls = self.controller_quorum.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS
+ if self.quorum_info.has_controllers:
+ # remote or co-located raft controller
+ raft_tls = self.controller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS \
+ or self.intercontroller_security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS
+ # clear irrelevant security protocols of SASL/TLS implications for remote controller quorum case
+ if self.quorum_info.has_controllers and not self.quorum_info.has_brokers:
+ security_protocol_to_use=SecurityConfig.PLAINTEXT
+ interbroker_security_protocol_to_use=SecurityConfig.PLAINTEXT
+
+ self._security_config = SecurityConfig(self.context, security_protocol_to_use, interbroker_security_protocol_to_use,
zk_sasl=self.zk.zk_sasl if self.quorum_info.using_zk else False, zk_tls=self.zk_client_secure,
- client_sasl_mechanism=client_sasl_mechanism_to_use,
- interbroker_sasl_mechanism=interbroker_sasl_mechanism_to_use,
+ client_sasl_mechanism=self.client_sasl_mechanism,
+ interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
listener_security_config=self.listener_security_config,
- tls_version=self.tls_version)
+ tls_version=self.tls_version,
+ serves_controller_sasl_mechanism=serves_controller_sasl_mechanism,
+ serves_intercontroller_sasl_mechanism=serves_intercontroller_sasl_mechanism,
+ uses_controller_sasl_mechanism=uses_controller_sasl_mechanism,
+ raft_tls=raft_tls)
+ # Ensure we have the right inter-broker security protocol because it may have been mutated
+ # since we cached our security config (ignore if this is a remote raft controller quorum case; the
+ # inter-broker security protocol is not used there).
+ if (self.quorum_info.using_zk or self.quorum_info.has_brokers) and \
+ self._security_config.interbroker_security_protocol != self.interbroker_security_protocol:
+ self._security_config.interbroker_security_protocol = self.interbroker_security_protocol
+ self._security_config.calc_has_sasl()
+ self._security_config.calc_has_ssl()
for port in self.port_mappings.values():
if port.open:
self._security_config.enable_security_protocol(port.security_protocol)
@@ -470,9 +511,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.port_mappings[listener_name].open = False
def start_minikdc_if_necessary(self, add_principals=""):
- has_sasl = self.security_config.has_sasl if self.quorum_info.using_zk else \
- self.security_config.has_sasl or self.controller_quorum.security_config.has_sasl if self.quorum_info.has_brokers else \
- self.security_config.has_sasl or self.remote_kafka.security_config.has_sasl
+ has_sasl = self.security_config.has_sasl
if has_sasl:
if self.minikdc is None:
other_service = self.remote_kafka if self.remote_kafka else self.controller_quorum if self.quorum_info.using_raft else None
@@ -497,7 +536,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
raise Exception("Unable to start Kafka: TLS to Zookeeper requested but Zookeeper secure port not enabled")
if self.quorum_info.has_brokers_and_controllers and (
self.controller_security_protocol != self.intercontroller_security_protocol or
- self.controller_sasl_mechanism != self.intercontroller_sasl_mechanism):
+ self.controller_security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS and self.controller_sasl_mechanism != self.intercontroller_sasl_mechanism):
+ # This is not supported because both the broker and the controller take the first entry from
+ # controller.listener.names and the value from sasl.mechanism.controller.protocol;
+ # they share a single config, so they must both see/use identical values.
raise Exception("Co-located Raft-based Brokers (%s/%s) and Controllers (%s/%s) cannot talk to Controllers via different security protocols" %
(self.controller_security_protocol, self.controller_sasl_mechanism,
self.intercontroller_security_protocol, self.intercontroller_sasl_mechanism))
@@ -666,6 +708,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
# define controller.listener.names
self.controller_listener_names = ','.join(self.controller_listener_name_list())
+ # define sasl.mechanism.controller.protocol to match remote quorum if one exists
+ if self.remote_controller_quorum:
+ self.controller_sasl_mechanism = self.remote_controller_quorum.controller_sasl_mechanism
prop_file = self.prop_file(node)
self.logger.info("kafka.properties:")
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index d7fa2d2..f5c9a74 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -95,6 +95,13 @@ zookeeper.ssl.truststore.password=test-ts-passwd
{% if quorum_info.using_zk or quorum_info.has_brokers %}
sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }}
{% endif %}
+{% if quorum_info.using_raft %}
+{% if not quorum_info.has_brokers %}
+sasl.mechanism.controller.protocol={{ intercontroller_sasl_mechanism }}
+{% else %}
+sasl.mechanism.controller.protocol={{ controller_quorum.controller_sasl_mechanism }}
+{% endif %}
+{% endif %}
sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
sasl.kerberos.service.name=kafka
{% if authorizer_class_name is not none %}
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index f68b93d..9ab7e6f 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -120,6 +120,8 @@ class SecurityConfig(TemplateRenderer):
SSL = 'SSL'
SASL_PLAINTEXT = 'SASL_PLAINTEXT'
SASL_SSL = 'SASL_SSL'
+ SASL_SECURITY_PROTOCOLS = [SASL_PLAINTEXT, SASL_SSL]
+ SSL_SECURITY_PROTOCOLS = [SSL, SASL_SSL]
SASL_MECHANISM_GSSAPI = 'GSSAPI'
SASL_MECHANISM_PLAIN = 'PLAIN'
SASL_MECHANISM_SCRAM_SHA_256 = 'SCRAM-SHA-256'
@@ -145,7 +147,11 @@ class SecurityConfig(TemplateRenderer):
def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
zk_sasl=False, zk_tls=False, template_props="", static_jaas_conf=True, jaas_override_variables=None,
- listener_security_config=ListenerSecurityConfig(), tls_version=None):
+ listener_security_config=ListenerSecurityConfig(), tls_version=None,
+ serves_controller_sasl_mechanism=None, # Raft Controller does this
+ serves_intercontroller_sasl_mechanism=None, # Raft Controller does this
+ uses_controller_sasl_mechanism=None, # communication to Raft Controller (broker and controller both do this)
+ raft_tls=False):
"""
Initialize the security properties for the node and copy
keystore and truststore to the remote node if the transport protocol
@@ -173,8 +179,17 @@ class SecurityConfig(TemplateRenderer):
if interbroker_security_protocol is None:
interbroker_security_protocol = security_protocol
self.interbroker_security_protocol = interbroker_security_protocol
- self.has_sasl = self.is_sasl(security_protocol) or self.is_sasl(interbroker_security_protocol) or zk_sasl
- self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol) or zk_tls
+ serves_raft_sasl = []
+ if serves_controller_sasl_mechanism is not None:
+ serves_raft_sasl += [serves_controller_sasl_mechanism]
+ if serves_intercontroller_sasl_mechanism is not None:
+ serves_raft_sasl += [serves_intercontroller_sasl_mechanism]
+ self.serves_raft_sasl = set(serves_raft_sasl)
+ uses_raft_sasl = []
+ if uses_controller_sasl_mechanism is not None:
+ uses_raft_sasl += [uses_controller_sasl_mechanism]
+ self.uses_raft_sasl = set(uses_raft_sasl)
+
self.zk_sasl = zk_sasl
self.zk_tls = zk_tls
self.static_jaas_conf = static_jaas_conf
@@ -191,6 +206,7 @@ class SecurityConfig(TemplateRenderer):
'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
'sasl.kerberos.service.name' : 'kafka'
}
+ self.raft_tls = raft_tls
if tls_version is not None:
self.properties.update({'tls.version' : tls_version})
@@ -198,6 +214,21 @@ class SecurityConfig(TemplateRenderer):
self.properties.update(self.listener_security_config.client_listener_overrides)
self.jaas_override_variables = jaas_override_variables or {}
+ self.calc_has_sasl()
+ self.calc_has_ssl()
+
+ def calc_has_sasl(self):
+ self.has_sasl = self.is_sasl(self.properties['security.protocol']) \
+ or self.is_sasl(self.interbroker_security_protocol) \
+ or self.zk_sasl \
+ or self.serves_raft_sasl or self.uses_raft_sasl
+
+ def calc_has_ssl(self):
+ self.has_ssl = self.is_ssl(self.properties['security.protocol']) \
+ or self.is_ssl(self.interbroker_security_protocol) \
+ or self.zk_tls \
+ or self.raft_tls
+
def client_config(self, template_props="", node=None, jaas_override_variables=None,
use_inter_broker_mechanism_for_client = False):
# If node is not specified, use static jaas config which will be created later.
@@ -315,10 +346,10 @@ class SecurityConfig(TemplateRenderer):
return value
def is_ssl(self, security_protocol):
- return security_protocol == SecurityConfig.SSL or security_protocol == SecurityConfig.SASL_SSL
+ return security_protocol in SecurityConfig.SSL_SECURITY_PROTOCOLS
def is_sasl(self, security_protocol):
- return security_protocol == SecurityConfig.SASL_PLAINTEXT or security_protocol == SecurityConfig.SASL_SSL
+ return security_protocol in SecurityConfig.SASL_SECURITY_PROTOCOLS
def is_sasl_scram(self, sasl_mechanism):
return sasl_mechanism == SecurityConfig.SASL_MECHANISM_SCRAM_SHA_256 or sasl_mechanism == SecurityConfig.SASL_MECHANISM_SCRAM_SHA_512
@@ -341,7 +372,16 @@ class SecurityConfig(TemplateRenderer):
@property
def enabled_sasl_mechanisms(self):
- return set([self.client_sasl_mechanism, self.interbroker_sasl_mechanism])
+ sasl_mechanisms = []
+ if self.is_sasl(self.security_protocol):
+ sasl_mechanisms += [self.client_sasl_mechanism]
+ if self.is_sasl(self.interbroker_security_protocol):
+ sasl_mechanisms += [self.interbroker_sasl_mechanism]
+ if self.serves_raft_sasl:
+ sasl_mechanisms += list(self.serves_raft_sasl)
+ if self.uses_raft_sasl:
+ sasl_mechanisms += list(self.uses_raft_sasl)
+ return set(sasl_mechanisms)
@property
def has_sasl_kerberos(self):
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
index 8dcc264..0ce12c9 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -57,7 +57,7 @@ class SecurityTest(EndToEndTest):
return True
- @cluster(num_nodes=7)
+ @cluster(num_nodes=6)
@matrix(security_protocol=['PLAINTEXT'], interbroker_security_protocol=['SSL'], metadata_quorum=quorum.all_non_upgrade)
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], metadata_quorum=quorum.all_non_upgrade)
def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol, metadata_quorum=quorum.zk):
@@ -78,34 +78,48 @@ class SecurityTest(EndToEndTest):
self.create_kafka(security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol)
+ if self.kafka.quorum_info.using_raft and interbroker_security_protocol == 'SSL':
+ # we don't want to interfere with communication to the controller quorum
+ # (we separately test this below) so make sure it isn't using TLS
+ # (it uses the inter-broker security information by default)
+ controller_quorum = self.kafka.controller_quorum
+ controller_quorum.controller_security_protocol = 'PLAINTEXT'
+ controller_quorum.intercontroller_security_protocol = 'PLAINTEXT'
self.kafka.start()
# now set the certs to have invalid hostnames so we can run the actual test
SecurityConfig.ssl_stores.valid_hostname = False
self.kafka.restart_cluster()
- # We need more verbose logging to catch the expected errors
- self.create_and_start_clients(log_level="DEBUG")
+ if self.kafka.quorum_info.using_raft and security_protocol == 'PLAINTEXT':
+ # the inter-broker security protocol using TLS with a hostname verification failure
+ # doesn't impact a producer in case of a single broker with a Raft Controller,
+ # so confirm that this is in fact the observed behavior
+ self.create_and_start_clients(log_level="INFO")
+ self.run_validation()
+ else:
+ # We need more verbose logging to catch the expected errors
+ self.create_and_start_clients(log_level="DEBUG")
- try:
- wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30)
+ try:
+ wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30)
- # Fail quickly if messages are successfully acked
- raise RuntimeError("Messages published successfully but should not have!"
- " Endpoint validation did not fail with invalid hostname")
- except TimeoutError:
- # expected
- pass
+ # Fail quickly if messages are successfully acked
+ raise RuntimeError("Messages published successfully but should not have!"
+ " Endpoint validation did not fail with invalid hostname")
+ except TimeoutError:
+ # expected
+ pass
- error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE'
- wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=30)
- self.producer.stop()
- self.consumer.stop()
+ error = 'SSLHandshakeException' if security_protocol == 'SSL' else 'LEADER_NOT_AVAILABLE'
+ wait_until(lambda: self.producer_consumer_have_expected_error(error), timeout_sec=30)
+ self.producer.stop()
+ self.consumer.stop()
- SecurityConfig.ssl_stores.valid_hostname = True
- self.kafka.restart_cluster()
- self.create_and_start_clients(log_level="INFO")
- self.run_validation()
+ SecurityConfig.ssl_stores.valid_hostname = True
+ self.kafka.restart_cluster()
+ self.create_and_start_clients(log_level="INFO")
+ self.run_validation()
def create_and_start_clients(self, log_level):
self.create_producer(log_level=log_level)
@@ -113,3 +127,47 @@ class SecurityTest(EndToEndTest):
self.create_consumer(log_level=log_level)
self.consumer.start()
+
+ @cluster(num_nodes=2)
+ @matrix(metadata_quorum=[quorum.zk, quorum.remote_raft])
+ def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum=quorum.zk):
+ """
+ Test that invalid hostname in ZooKeeper or Raft Controller results in broker inability to start.
+ """
+ # Start ZooKeeper/Raft-based Controller with valid hostnames in the certs' SANs
+ # so that we can start Kafka
+ SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir,
+ valid_hostname=True)
+
+ self.create_zookeeper_if_necessary(num_nodes=1,
+ zk_client_port = False,
+ zk_client_secure_port = True,
+ zk_tls_encrypt_only = True,
+ )
+ if self.zk:
+ self.zk.start()
+
+ self.create_kafka(num_nodes=1,
+ interbroker_security_protocol='SSL', # also sets the broker-to-raft-controller security protocol for the Raft case
+ zk_client_secure=True, # ignored if we aren't using ZooKeeper
+ )
+ self.kafka.start()
+
+ # now stop the Kafka broker
+ # and set the cert for ZooKeeper/Raft-based Controller to have an invalid hostname
+ # so we can restart Kafka and ensure it is unable to start
+ self.kafka.stop_node(self.kafka.nodes[0])
+
+ SecurityConfig.ssl_stores.valid_hostname = False
+ if quorum.for_test(self.test_context) == quorum.zk:
+ self.kafka.zk.restart_cluster()
+ else:
+ self.kafka.remote_controller_quorum.restart_cluster()
+
+ try:
+ self.kafka.start_node(self.kafka.nodes[0], timeout_sec=30)
+ raise RuntimeError("Kafka restarted successfully but should not have!"
+ " Endpoint validation did not fail with invalid hostname")
+ except TimeoutError:
+ # expected
+ pass