You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/27 00:25:22 UTC

[kafka] 03/03: MINOR: Support listener config overrides in system tests (#6981)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 5bbe32b174131652e09d7288374d33b3fb35361a
Author: Brian Bushree <bb...@confluent.io>
AuthorDate: Thu Jun 27 10:10:43 2019 -0700

    MINOR:  Support listener config overrides in system tests (#6981)
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 tests/kafkatest/services/kafka/kafka.py            | 26 +++++++---------
 .../services/kafka/templates/kafka.properties      | 18 +++++++++++
 .../services/security/listener_security_config.py  | 36 ++++++++++++++++++++++
 .../kafkatest/services/security/security_config.py | 35 +++++++++++++--------
 4 files changed, 87 insertions(+), 28 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e6e0256..d39ff6a 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -29,6 +29,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.kafka import config_property
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.security.minikdc import MiniKdc
+from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
 
@@ -50,7 +51,6 @@ class KafkaListener:
     def listener_security_protocol(self):
         return "%s:%s" % (self.name, self.security_protocol)
 
-
 class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     PERSISTENT_ROOT = "/mnt/kafka"
     STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log")
@@ -94,7 +94,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
                  jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
-                 use_separate_interbroker_listener=False, per_node_server_prop_overrides=None, extra_kafka_opts=""):
+                 listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None, extra_kafka_opts=""):
         """
         :param context: test context
         :param ZookeeperService zk:
@@ -111,14 +111,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         :param int zk_session_timeout:
         :param dict server_prop_overides: overrides for kafka.properties file
         :param zk_chroot:
-        :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener,
-        with security protocol set to interbroker_security_protocol value. If set, requires
-        interbroker_security_protocol to be provided.
-        Normally port name is the same as its security protocol, so setting security_protocol and
-        interbroker_security_protocol to the same value will lead to a single port being open and both client
-        and broker-to-broker communication will go over that port. This parameter allows
-        you to add an interbroker listener with the same security protocol as a client listener, but running on a
-        separate port.
+        :param ListenerSecurityConfig listener_security_config: listener config to use
         """
         Service.__init__(self, context, num_nodes)
         JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
@@ -143,6 +136,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.log_level = "DEBUG"
         self.zk_chroot = zk_chroot
         self.extra_kafka_opts = extra_kafka_opts
+        self.listener_security_config = listener_security_config
 
         #
         # In a heavily loaded and not very fast machine, it is
@@ -172,7 +166,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         }
 
         self.interbroker_listener = None
-        self.setup_interbroker_listener(interbroker_security_protocol, use_separate_interbroker_listener)
+        self.setup_interbroker_listener(interbroker_security_protocol, self.listener_security_config.use_separate_interbroker_listener)
         self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
 
         for node in self.nodes:
@@ -194,9 +188,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.setup_interbroker_listener(security_protocol, use_separate_listener=False)
 
     def setup_interbroker_listener(self, security_protocol, use_separate_listener=False):
-        self.use_separate_interbroker_listener = use_separate_listener
+        self.listener_security_config.use_separate_interbroker_listener = use_separate_listener
 
-        if self.use_separate_interbroker_listener:
+        if self.listener_security_config.use_separate_interbroker_listener:
             # do not close existing port here since it is not used exclusively for interbroker communication
             self.interbroker_listener = self.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME]
             self.interbroker_listener.security_protocol = security_protocol
@@ -210,7 +204,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol,
                                 zk_sasl=self.zk.zk_sasl,
                                 client_sasl_mechanism=self.client_sasl_mechanism,
-                                interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
+                                interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
+                                listener_security_config=self.listener_security_config)
         for port in self.port_mappings.values():
             if port.open:
                 config.enable_security_protocol(port.security_protocol)
@@ -291,7 +286,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         #load template configs as dictionary
         config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node),
-                                 security_config=self.security_config, num_nodes=self.num_nodes)
+                                 security_config=self.security_config, num_nodes=self.num_nodes,
+                                 listener_security_config=self.listener_security_config)
 
         configs = dict( l.rstrip().split('=', 1) for l in config_template.split('\n')
                         if not l.startswith("#") and "=" in l )
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 11e43be..6060bfa 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -27,6 +27,24 @@ inter.broker.listener.name={{ interbroker_listener.name }}
 security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
 {% endif %}
 
+{% for k, v in listener_security_config.client_listener_overrides.iteritems() %}
+{% if k.startswith('sasl.') %}
+listener.name.{{ security_protocol.lower() }}.{{ security_config.client_sasl_mechanism.lower() }}.{{ k }}={{ v }}
+{% else %}
+listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }}
+{% endif %}
+{% endfor %}
+
+{% if interbroker_listener.name != security_protocol %}
+{% for k, v in listener_security_config.interbroker_listener_overrides.iteritems() %}
+{% if k.startswith('sasl.') %}
+listener.name.{{ interbroker_listener.name.lower() }}.{{ security_config.interbroker_sasl_mechanism.lower() }}.{{ k }}={{ v }}
+{% else %}
+listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }}
+{% endif %}
+{% endfor %}
+{% endif %}
+
 ssl.keystore.location=/mnt/security/test.keystore.jks
 ssl.keystore.password=test-ks-passwd
 ssl.key.password=test-key-passwd
diff --git a/tests/kafkatest/services/security/listener_security_config.py b/tests/kafkatest/services/security/listener_security_config.py
new file mode 100644
index 0000000..74e9e39
--- /dev/null
+++ b/tests/kafkatest/services/security/listener_security_config.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class ListenerSecurityConfig:
+
+    def __init__(self, use_separate_interbroker_listener=False,
+                 client_listener_overrides={}, interbroker_listener_overrides={}):
+        """
+        :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener,
+        with security protocol set to interbroker_security_protocol value. If set, requires
+        interbroker_security_protocol to be provided.
+        Normally port name is the same as its security protocol, so setting security_protocol and
+        interbroker_security_protocol to the same value will lead to a single port being open and both client
+        and broker-to-broker communication will go over that port. This parameter allows
+        you to add an interbroker listener with the same security protocol as a client listener, but running on a
+        separate port.
+        :param dict client_listener_overrides - non-prefixed listener config overrides for named client listener
+        (for example 'sasl.jaas.config', 'ssl.keystore.location', 'sasl.login.callback.handler.class', etc).
+        :param dict interbroker_listener_overrides - non-prefixed listener config overrides for named interbroker
+        listener (for example 'sasl.jaas.config', 'ssl.keystore.location', 'sasl.login.callback.handler.class', etc).
+        """
+        self.use_separate_interbroker_listener = use_separate_interbroker_listener
+        self.client_listener_overrides = client_listener_overrides
+        self.interbroker_listener_overrides = interbroker_listener_overrides
\ No newline at end of file
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index b2fa489..0398557 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -19,6 +19,7 @@ from tempfile import mkdtemp
 from shutil import rmtree
 from ducktape.template import TemplateRenderer
 from kafkatest.services.security.minikdc import MiniKdc
+from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
 import itertools
 
 
@@ -112,7 +113,8 @@ class SecurityConfig(TemplateRenderer):
 
     def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
                  client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
-                 zk_sasl=False, template_props="", static_jaas_conf=True, jaas_override_variables=None):
+                 zk_sasl=False, template_props="", static_jaas_conf=True, jaas_override_variables=None,
+                 listener_security_config=ListenerSecurityConfig()):
         """
         Initialize the security properties for the node and copy
         keystore and truststore to the remote node if the transport protocol 
@@ -144,6 +146,7 @@ class SecurityConfig(TemplateRenderer):
         self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol)
         self.zk_sasl = zk_sasl
         self.static_jaas_conf = static_jaas_conf
+        self.listener_security_config = listener_security_config
         self.properties = {
             'security.protocol' : security_protocol,
             'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
@@ -156,6 +159,7 @@ class SecurityConfig(TemplateRenderer):
             'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
             'sasl.kerberos.service.name' : 'kafka'
         }
+        self.properties.update(self.listener_security_config.client_listener_overrides)
         self.jaas_override_variables = jaas_override_variables or {}
 
     def client_config(self, template_props="", node=None, jaas_override_variables=None):
@@ -169,7 +173,8 @@ class SecurityConfig(TemplateRenderer):
                               client_sasl_mechanism=self.client_sasl_mechanism,
                               template_props=template_props,
                               static_jaas_conf=static_jaas_conf,
-                              jaas_override_variables=jaas_override_variables)
+                              jaas_override_variables=jaas_override_variables,
+                              listener_security_config=self.listener_security_config)
 
     def enable_security_protocol(self, security_protocol):
         self.has_sasl = self.has_sasl or self.is_sasl(security_protocol)
@@ -185,20 +190,24 @@ class SecurityConfig(TemplateRenderer):
         jaas_conf_file = "jaas.conf"
         java_version = node.account.ssh_capture("java -version")
 
-        jaas_conf = self.render_jaas_config(
-            jaas_conf_file,
-            {
-                'node': node,
-                'is_ibm_jdk': any('IBM' in line for line in java_version),
-                'SecurityConfig': SecurityConfig,
-                'client_sasl_mechanism': self.client_sasl_mechanism,
-                'enabled_sasl_mechanisms': self.enabled_sasl_mechanisms
-            }
-        )
+        jaas_conf = None
+        if 'sasl.jaas.config' not in self.properties:
+            jaas_conf = self.render_jaas_config(
+                jaas_conf_file,
+                {
+                    'node': node,
+                    'is_ibm_jdk': any('IBM' in line for line in java_version),
+                    'SecurityConfig': SecurityConfig,
+                    'client_sasl_mechanism': self.client_sasl_mechanism,
+                    'enabled_sasl_mechanisms': self.enabled_sasl_mechanisms
+                }
+            )
+        else:
+            jaas_conf = self.properties['sasl.jaas.config']
 
         if self.static_jaas_conf:
             node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
-        else:
+        elif 'sasl.jaas.config' not in self.properties:
             self.properties['sasl.jaas.config'] = jaas_conf.replace("\n", " \\\n")
         if self.has_sasl_kerberos:
             node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)