You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2019/06/27 17:10:57 UTC
[kafka] branch trunk updated: MINOR: Support listener config
overrides in system tests (#6981)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 357aede MINOR: Support listener config overrides in system tests (#6981)
357aede is described below
commit 357aedeb1b9d4b0e7c40b29487751bd3a8ae009b
Author: Brian Bushree <bb...@confluent.io>
AuthorDate: Thu Jun 27 10:10:43 2019 -0700
MINOR: Support listener config overrides in system tests (#6981)
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
tests/kafkatest/services/kafka/kafka.py | 26 +++++++---------
.../services/kafka/templates/kafka.properties | 18 +++++++++++
.../services/security/listener_security_config.py | 36 ++++++++++++++++++++++
.../kafkatest/services/security/security_config.py | 35 +++++++++++++--------
4 files changed, 87 insertions(+), 28 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index d77c784..bc99d96 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -29,6 +29,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import config_property
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.security.minikdc import MiniKdc
+from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
@@ -50,7 +51,6 @@ class KafkaListener:
def listener_security_protocol(self):
return "%s:%s" % (self.name, self.security_protocol)
-
class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
PERSISTENT_ROOT = "/mnt/kafka"
STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log")
@@ -94,7 +94,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
- use_separate_interbroker_listener=False):
+ listener_security_config=ListenerSecurityConfig()):
"""
:param context: test context
:param ZookeeperService zk:
@@ -111,14 +111,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
:param int zk_session_timeout:
:param dict server_prop_overides: overrides for kafka.properties file
:param zk_chroot:
- :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener,
- with security protocol set to interbroker_security_protocol value. If set, requires
- interbroker_security_protocol to be provided.
- Normally port name is the same as its security protocol, so setting security_protocol and
- interbroker_security_protocol to the same value will lead to a single port being open and both client
- and broker-to-broker communication will go over that port. This parameter allows
- you to add an interbroker listener with the same security protocol as a client listener, but running on a
- separate port.
+ :param ListenerSecurityConfig listener_security_config: listener config to use
"""
Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
@@ -138,6 +131,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.server_prop_overides = server_prop_overides
self.log_level = "DEBUG"
self.zk_chroot = zk_chroot
+ self.listener_security_config = listener_security_config
#
# In a heavily loaded and not very fast machine, it is
@@ -167,7 +161,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
}
self.interbroker_listener = None
- self.setup_interbroker_listener(interbroker_security_protocol, use_separate_interbroker_listener)
+ self.setup_interbroker_listener(interbroker_security_protocol, self.listener_security_config.use_separate_interbroker_listener)
self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
for node in self.nodes:
@@ -189,9 +183,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.setup_interbroker_listener(security_protocol, use_separate_listener=False)
def setup_interbroker_listener(self, security_protocol, use_separate_listener=False):
- self.use_separate_interbroker_listener = use_separate_listener
+ self.listener_security_config.use_separate_interbroker_listener = use_separate_listener
- if self.use_separate_interbroker_listener:
+ if self.listener_security_config.use_separate_interbroker_listener:
# do not close existing port here since it is not used exclusively for interbroker communication
self.interbroker_listener = self.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME]
self.interbroker_listener.security_protocol = security_protocol
@@ -205,7 +199,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol,
zk_sasl=self.zk.zk_sasl,
client_sasl_mechanism=self.client_sasl_mechanism,
- interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
+ interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
+ listener_security_config=self.listener_security_config)
for port in self.port_mappings.values():
if port.open:
config.enable_security_protocol(port.security_protocol)
@@ -286,7 +281,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
#load template configs as dictionary
config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node),
- security_config=self.security_config, num_nodes=self.num_nodes)
+ security_config=self.security_config, num_nodes=self.num_nodes,
+ listener_security_config=self.listener_security_config)
configs = dict( l.rstrip().split('=', 1) for l in config_template.split('\n')
if not l.startswith("#") and "=" in l )
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 11e43be..6060bfa 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -27,6 +27,24 @@ inter.broker.listener.name={{ interbroker_listener.name }}
security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
{% endif %}
+{% for k, v in listener_security_config.client_listener_overrides.iteritems() %}
+{% if k.startswith('sasl.') %}
+listener.name.{{ security_protocol.lower() }}.{{ security_config.client_sasl_mechanism.lower() }}.{{ k }}={{ v }}
+{% else %}
+listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }}
+{% endif %}
+{% endfor %}
+
+{% if interbroker_listener.name != security_protocol %}
+{% for k, v in listener_security_config.interbroker_listener_overrides.iteritems() %}
+{% if k.startswith('sasl.') %}
+listener.name.{{ interbroker_listener.name.lower() }}.{{ security_config.interbroker_sasl_mechanism.lower() }}.{{ k }}={{ v }}
+{% else %}
+listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }}
+{% endif %}
+{% endfor %}
+{% endif %}
+
ssl.keystore.location=/mnt/security/test.keystore.jks
ssl.keystore.password=test-ks-passwd
ssl.key.password=test-key-passwd
diff --git a/tests/kafkatest/services/security/listener_security_config.py b/tests/kafkatest/services/security/listener_security_config.py
new file mode 100644
index 0000000..74e9e39
--- /dev/null
+++ b/tests/kafkatest/services/security/listener_security_config.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class ListenerSecurityConfig:
+
+ def __init__(self, use_separate_interbroker_listener=False,
+ client_listener_overrides={}, interbroker_listener_overrides={}):
+ """
+ :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener,
+ with security protocol set to interbroker_security_protocol value. If set, requires
+ interbroker_security_protocol to be provided.
+ Normally port name is the same as its security protocol, so setting security_protocol and
+ interbroker_security_protocol to the same value will lead to a single port being open and both client
+ and broker-to-broker communication will go over that port. This parameter allows
+ you to add an interbroker listener with the same security protocol as a client listener, but running on a
+ separate port.
+ :param dict client_listener_overrides - non-prefixed listener config overrides for named client listener
+ (for example 'sasl.jaas.config', 'ssl.keystore.location', 'sasl.login.callback.handler.class', etc).
+ :param dict interbroker_listener_overrides - non-prefixed listener config overrides for named interbroker
+ listener (for example 'sasl.jaas.config', 'ssl.keystore.location', 'sasl.login.callback.handler.class', etc).
+ """
+ self.use_separate_interbroker_listener = use_separate_interbroker_listener
+ self.client_listener_overrides = client_listener_overrides
+ self.interbroker_listener_overrides = interbroker_listener_overrides
\ No newline at end of file
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index b2fa489..0398557 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -19,6 +19,7 @@ from tempfile import mkdtemp
from shutil import rmtree
from ducktape.template import TemplateRenderer
from kafkatest.services.security.minikdc import MiniKdc
+from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
import itertools
@@ -112,7 +113,8 @@ class SecurityConfig(TemplateRenderer):
def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
- zk_sasl=False, template_props="", static_jaas_conf=True, jaas_override_variables=None):
+ zk_sasl=False, template_props="", static_jaas_conf=True, jaas_override_variables=None,
+ listener_security_config=ListenerSecurityConfig()):
"""
Initialize the security properties for the node and copy
keystore and truststore to the remote node if the transport protocol
@@ -144,6 +146,7 @@ class SecurityConfig(TemplateRenderer):
self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol)
self.zk_sasl = zk_sasl
self.static_jaas_conf = static_jaas_conf
+ self.listener_security_config = listener_security_config
self.properties = {
'security.protocol' : security_protocol,
'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
@@ -156,6 +159,7 @@ class SecurityConfig(TemplateRenderer):
'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
'sasl.kerberos.service.name' : 'kafka'
}
+ self.properties.update(self.listener_security_config.client_listener_overrides)
self.jaas_override_variables = jaas_override_variables or {}
def client_config(self, template_props="", node=None, jaas_override_variables=None):
@@ -169,7 +173,8 @@ class SecurityConfig(TemplateRenderer):
client_sasl_mechanism=self.client_sasl_mechanism,
template_props=template_props,
static_jaas_conf=static_jaas_conf,
- jaas_override_variables=jaas_override_variables)
+ jaas_override_variables=jaas_override_variables,
+ listener_security_config=self.listener_security_config)
def enable_security_protocol(self, security_protocol):
self.has_sasl = self.has_sasl or self.is_sasl(security_protocol)
@@ -185,20 +190,24 @@ class SecurityConfig(TemplateRenderer):
jaas_conf_file = "jaas.conf"
java_version = node.account.ssh_capture("java -version")
- jaas_conf = self.render_jaas_config(
- jaas_conf_file,
- {
- 'node': node,
- 'is_ibm_jdk': any('IBM' in line for line in java_version),
- 'SecurityConfig': SecurityConfig,
- 'client_sasl_mechanism': self.client_sasl_mechanism,
- 'enabled_sasl_mechanisms': self.enabled_sasl_mechanisms
- }
- )
+ jaas_conf = None
+ if 'sasl.jaas.config' not in self.properties:
+ jaas_conf = self.render_jaas_config(
+ jaas_conf_file,
+ {
+ 'node': node,
+ 'is_ibm_jdk': any('IBM' in line for line in java_version),
+ 'SecurityConfig': SecurityConfig,
+ 'client_sasl_mechanism': self.client_sasl_mechanism,
+ 'enabled_sasl_mechanisms': self.enabled_sasl_mechanisms
+ }
+ )
+ else:
+ jaas_conf = self.properties['sasl.jaas.config']
if self.static_jaas_conf:
node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
- else:
+ elif 'sasl.jaas.config' not in self.properties:
self.properties['sasl.jaas.config'] = jaas_conf.replace("\n", " \\\n")
if self.has_sasl_kerberos:
node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)