You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/27 00:25:19 UTC

[kafka] branch 2.3 updated (ae2d59c -> 5bbe32b)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a change to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from ae2d59c  KAFKA-9472: Remove deleted Connect tasks from status store (#8118)
     new 6de76f6  KAFKA-8557: system tests - add support for (optional) interbroker listener with the same security protocol as client listeners (#6938)
     new 1482278  MINOR: Fix failing upgrade test by supporting both security.inter.broker.protocol and inter.broker.listener.name depending on kafka version (#7000)
     new 5bbe32b  MINOR:  Support listener config overrides in system tests (#6981)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 tests/kafkatest/services/kafka/kafka.py            | 135 +++++++++++++++------
 .../services/kafka/templates/kafka.properties      |  25 +++-
 .../services/security/listener_security_config.py  |  36 ++++++
 .../kafkatest/services/security/security_config.py |  35 ++++--
 .../tests/core/security_rolling_upgrade_test.py    |  96 +++++++++++----
 tests/kafkatest/version.py                         |   3 +
 6 files changed, 261 insertions(+), 69 deletions(-)
 create mode 100644 tests/kafkatest/services/security/listener_security_config.py


[kafka] 01/03: KAFKA-8557: system tests - add support for (optional) interbroker listener with the same security protocol as client listeners (#6938)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 6de76f6fdf725bab7d6aa52893ba840e6518a7a4
Author: Stanislav Vodetskyi <49...@users.noreply.github.com>
AuthorDate: Fri Jun 21 09:51:43 2019 -0700

    KAFKA-8557: system tests - add support for (optional) interbroker listener with the same security protocol as client listeners (#6938)
    
    Reviewers: Brian Bushree <bb...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
 tests/kafkatest/services/kafka/kafka.py            | 137 ++++++++++++++++-----
 .../services/kafka/templates/kafka.properties      |   3 +-
 .../tests/core/security_rolling_upgrade_test.py    |  96 +++++++++++----
 3 files changed, 181 insertions(+), 55 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index fe972b9..e6e0256 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -32,7 +32,24 @@ from kafkatest.services.security.minikdc import MiniKdc
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
 
-Port = collections.namedtuple('Port', ['name', 'number', 'open'])
+
+class KafkaListener:
+
+    def __init__(self, name, port_number, security_protocol, open=False):
+        self.name = name
+        self.port_number = port_number
+        self.security_protocol = security_protocol
+        self.open = open
+
+    def listener(self):
+        return "%s://:%s" % (self.name, str(self.port_number))
+
+    def advertised_listener(self, node):
+        return "%s://%s:%s" % (self.name, node.account.hostname, str(self.port_number))
+
+    def listener_security_protocol(self):
+        return "%s:%s" % (self.name, self.security_protocol)
+
 
 class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     PERSISTENT_ROOT = "/mnt/kafka"
@@ -50,6 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     # Kafka Authorizer
     SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
+    INTERBROKER_LISTENER_NAME = 'INTERNAL'
 
     logs = {
         "kafka_server_start_stdout_stderr": {
@@ -76,11 +94,31 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
                  jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
-                 per_node_server_prop_overrides=None, extra_kafka_opts=""):
+                 use_separate_interbroker_listener=False, per_node_server_prop_overrides=None, extra_kafka_opts=""):
         """
-        :type context
-        :type zk: ZookeeperService
-        :type topics: dict
+        :param context: test context
+        :param ZookeeperService zk:
+        :param dict topics: which topics to create automatically
+        :param str security_protocol: security protocol for clients to use
+        :param str interbroker_security_protocol: security protocol to use for broker-to-broker communication
+        :param str client_sasl_mechanism: sasl mechanism for clients to use
+        :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication
+        :param str authorizer_class_name: which authorizer class to use
+        :param str version: which kafka version to use. Defaults to "dev" branch
+        :param jmx_object_names:
+        :param jmx_attributes:
+        :param int zk_connect_timeout:
+        :param int zk_session_timeout:
+        :param dict server_prop_overides: overrides for kafka.properties file
+        :param zk_chroot:
+        :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener,
+        with security protocol set to interbroker_security_protocol value. If set, requires
+        interbroker_security_protocol to be provided.
+        Normally port name is the same as its security protocol, so setting security_protocol and
+        interbroker_security_protocol to the same value will lead to a single port being open and both client
+        and broker-to-broker communication will go over that port. This parameter allows
+        you to add an interbroker listener with the same security protocol as a client listener, but running on a
+        separate port.
         """
         Service.__init__(self, context, num_nodes)
         JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
@@ -89,9 +127,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.zk = zk
 
         self.security_protocol = security_protocol
-        self.interbroker_security_protocol = interbroker_security_protocol
         self.client_sasl_mechanism = client_sasl_mechanism
-        self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
         self.topics = topics
         self.minikdc = None
         self.authorizer_class_name = authorizer_class_name
@@ -127,37 +163,64 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.zk_session_timeout = zk_session_timeout
 
         self.port_mappings = {
-            'PLAINTEXT': Port('PLAINTEXT', 9092, False),
-            'SSL': Port('SSL', 9093, False),
-            'SASL_PLAINTEXT': Port('SASL_PLAINTEXT', 9094, False),
-            'SASL_SSL': Port('SASL_SSL', 9095, False)
+            'PLAINTEXT': KafkaListener('PLAINTEXT', 9092, 'PLAINTEXT', False),
+            'SSL': KafkaListener('SSL', 9093, 'SSL', False),
+            'SASL_PLAINTEXT': KafkaListener('SASL_PLAINTEXT', 9094, 'SASL_PLAINTEXT', False),
+            'SASL_SSL': KafkaListener('SASL_SSL', 9095, 'SASL_SSL', False),
+            KafkaService.INTERBROKER_LISTENER_NAME:
+                KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, 9099, None, False)
         }
 
+        self.interbroker_listener = None
+        self.setup_interbroker_listener(interbroker_security_protocol, use_separate_interbroker_listener)
+        self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
+
         for node in self.nodes:
             node.version = version
             node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)})
 
-
     def set_version(self, version):
         for node in self.nodes:
             node.version = version
 
     @property
+    def interbroker_security_protocol(self):
+        return self.interbroker_listener.security_protocol
+
+    # this is required for backwards compatibility - there are a lot of tests that set this property explicitly
+    # meaning 'use one of the existing listeners that match given security protocol, do not use custom listener'
+    @interbroker_security_protocol.setter
+    def interbroker_security_protocol(self, security_protocol):
+        self.setup_interbroker_listener(security_protocol, use_separate_listener=False)
+
+    def setup_interbroker_listener(self, security_protocol, use_separate_listener=False):
+        self.use_separate_interbroker_listener = use_separate_listener
+
+        if self.use_separate_interbroker_listener:
+            # do not close existing port here since it is not used exclusively for interbroker communication
+            self.interbroker_listener = self.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME]
+            self.interbroker_listener.security_protocol = security_protocol
+        else:
+            # close dedicated interbroker port, so it's not dangling in 'listeners' and 'advertised.listeners'
+            self.close_port(KafkaService.INTERBROKER_LISTENER_NAME)
+            self.interbroker_listener = self.port_mappings[security_protocol]
+
+    @property
     def security_config(self):
-        config = SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,
-                              zk_sasl=self.zk.zk_sasl,
-                              client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
-        for protocol in self.port_mappings:
-            port = self.port_mappings[protocol]
+        config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol,
+                                zk_sasl=self.zk.zk_sasl,
+                                client_sasl_mechanism=self.client_sasl_mechanism,
+                                interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
+        for port in self.port_mappings.values():
             if port.open:
-                config.enable_security_protocol(port.name)
+                config.enable_security_protocol(port.security_protocol)
         return config
 
-    def open_port(self, protocol):
-        self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True)
+    def open_port(self, listener_name):
+        self.port_mappings[listener_name].open = True
 
-    def close_port(self, protocol):
-        self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=False)
+    def close_port(self, listener_name):
+        self.port_mappings[listener_name].open = False
 
     def start_minikdc(self, add_principals=""):
         if self.security_config.has_sasl:
@@ -172,7 +235,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
     def start(self, add_principals=""):
         self.open_port(self.security_protocol)
-        self.open_port(self.interbroker_security_protocol)
+        self.interbroker_listener.open = True
 
         self.start_minikdc(add_principals)
         self._ensure_zk_chroot()
@@ -210,15 +273,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def set_protocol_and_port(self, node):
         listeners = []
         advertised_listeners = []
+        protocol_map = []
 
-        for protocol in self.port_mappings:
-            port = self.port_mappings[protocol]
+        for port in self.port_mappings.values():
             if port.open:
-                listeners.append(port.name + "://:" + str(port.number))
-                advertised_listeners.append(port.name + "://" +  node.account.hostname + ":" + str(port.number))
+                listeners.append(port.listener())
+                advertised_listeners.append(port.advertised_listener(node))
+                protocol_map.append(port.listener_security_protocol())
 
         self.listeners = ','.join(listeners)
         self.advertised_listeners = ','.join(advertised_listeners)
+        self.listener_security_protocol_map = ','.join(protocol_map)
+        self.interbroker_bootstrap_servers = self.__bootstrap_servers(self.interbroker_listener, True)
 
     def prop_file(self, node):
         self.set_protocol_and_port(node)
@@ -685,18 +751,23 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def zk_connect_setting(self):
         return self.zk.connect_setting(self.zk_chroot)
 
+    def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
+        if validate and not port.open:
+            raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " %
+                             str(port.port_number))
+
+        return ','.join([node.account.hostname + ":" + str(port.port_number)
+                         for node in self.nodes
+                         if node not in offline_nodes])
+
     def bootstrap_servers(self, protocol='PLAINTEXT', validate=True, offline_nodes=[]):
         """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
 
         This is the format expected by many config files.
         """
         port_mapping = self.port_mappings[protocol]
-        self.logger.info("Bootstrap client port is: " + str(port_mapping.number))
-
-        if validate and not port_mapping.open:
-            raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping))
-
-        return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes if node not in offline_nodes])
+        self.logger.info("Bootstrap client port is: " + str(port_mapping.port_number))
+        return self.__bootstrap_servers(port_mapping, validate, offline_nodes)
 
     def controller(self):
         """ Get the controller node
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 4362978..2736e91 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -19,8 +19,9 @@ advertised.host.name={{ node.account.hostname }}
 
 listeners={{ listeners }}
 advertised.listeners={{ advertised_listeners }}
+listener.security.protocol.map={{ listener_security_protocol_map }}
 
-security.inter.broker.protocol={{ security_config.interbroker_security_protocol }}
+inter.broker.listener.name={{ interbroker_listener.name }}
 
 ssl.keystore.location=/mnt/security/test.keystore.jks
 ssl.keystore.password=test-ks-passwd
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index ba014ea..a64363c 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -12,15 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.utils import is_int
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from ducktape.mark import parametrize, matrix
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from kafkatest.services.security.kafka_acls import ACLs
 import time
@@ -66,13 +65,12 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
             time.sleep(10)
 
     def roll_in_secured_settings(self, client_protocol, broker_protocol):
-
         # Roll cluster to include inter broker security protocol.
-        self.kafka.interbroker_security_protocol = broker_protocol
+        self.kafka.setup_interbroker_listener(broker_protocol)
         self.bounce()
 
         # Roll cluster to disable PLAINTEXT port
-        self.kafka.close_port('PLAINTEXT')
+        self.kafka.close_port(SecurityConfig.PLAINTEXT)
         self.set_authorizer_and_bounce(client_protocol, broker_protocol)
 
     def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
@@ -100,17 +98,31 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         # Bounce again with ACLs for new mechanism
         self.set_authorizer_and_bounce(security_protocol, security_protocol)
 
+    def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism):
+        self.kafka.setup_interbroker_listener(broker_security_protocol, True)
+        self.kafka.interbroker_sasl_mechanism = broker_sasl_mechanism
+        # kafka opens interbroker port automatically in start() but not in bounce()
+        self.kafka.open_port(self.kafka.INTERBROKER_LISTENER_NAME)
+        self.bounce()
+
+    def remove_separate_broker_listener(self, client_security_protocol, client_sasl_mechanism):
+        # separate interbroker listener port will be closed automatically in setup_interbroker_listener
+        # if not using separate interbroker listener
+        self.kafka.setup_interbroker_listener(client_security_protocol, False)
+        self.kafka.interbroker_sasl_mechanism = client_sasl_mechanism
+        self.bounce()
+
     @cluster(num_nodes=8)
-    @matrix(client_protocol=["SSL"])
+    @matrix(client_protocol=[SecurityConfig.SSL])
     @cluster(num_nodes=9)
-    @matrix(client_protocol=["SASL_PLAINTEXT", "SASL_SSL"])
+    @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL])
     def test_rolling_upgrade_phase_one(self, client_protocol):
         """
         Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
         and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port.
         """
-        self.kafka.interbroker_security_protocol = "PLAINTEXT"
-        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT)
+        self.kafka.security_protocol = SecurityConfig.PLAINTEXT
         self.kafka.start()
 
         # Create PLAINTEXT producer and consumer
@@ -125,7 +137,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
     @cluster(num_nodes=8)
-    @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
+    @matrix(client_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT],
+            broker_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT])
     def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
         """
         Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
@@ -137,7 +150,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         """
         #Given we have a broker that has both secure and PLAINTEXT ports open
         self.kafka.security_protocol = client_protocol
-        self.kafka.interbroker_security_protocol = "PLAINTEXT"
+        self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, use_separate_listener=False)
         self.kafka.open_port(broker_protocol)
         self.kafka.start()
 
@@ -148,16 +161,16 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
 
     @cluster(num_nodes=9)
-    @parametrize(new_client_sasl_mechanism='PLAIN')
+    @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
     def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
         """
         Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a rolling upgrade, ensuring we could produce
         and consume throughout over SASL/GSSAPI. Finally check we can produce and consume using new mechanism.
         """
-        self.kafka.interbroker_security_protocol = "SASL_SSL"
-        self.kafka.security_protocol = "SASL_SSL"
-        self.kafka.client_sasl_mechanism = "GSSAPI"
-        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+        self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=False)
+        self.kafka.security_protocol = SecurityConfig.SASL_SSL
+        self.kafka.client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+        self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
         self.kafka.start()
 
         # Create SASL/GSSAPI producer and consumer
@@ -172,7 +185,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
     @cluster(num_nodes=8)
-    @parametrize(new_sasl_mechanism='PLAIN')
+    @matrix(new_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
     def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
         """
         Start with a SASL cluster with GSSAPI for inter-broker and a second mechanism for clients (i.e. result of phase one).
@@ -182,10 +195,10 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         Ensure the producer and consumer run throughout
         """
         #Start with a broker that has GSSAPI for inter-broker and a second mechanism for clients
-        self.kafka.security_protocol = "SASL_SSL"
-        self.kafka.interbroker_security_protocol = "SASL_SSL"
+        self.kafka.security_protocol = SecurityConfig.SASL_SSL
+        self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=False)
         self.kafka.client_sasl_mechanism = new_sasl_mechanism
-        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+        self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
         self.kafka.start()
 
         #Create Producer and Consumer using second mechanism
@@ -194,3 +207,44 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         #Roll in the second SASL mechanism for inter-broker, disabling first mechanism. Ensure we can produce and consume throughout
         self.run_produce_consume_validate(self.roll_in_sasl_mechanism, self.kafka.security_protocol, new_sasl_mechanism)
 
+    @cluster(num_nodes=9)
+    def test_enable_separate_interbroker_listener(self):
+        """
+        Start with a cluster that has a single PLAINTEXT listener.
+        Start producing/consuming on PLAINTEXT port.
+        While doing that, do a rolling restart to enable separate secured interbroker port
+        """
+        self.kafka.security_protocol = SecurityConfig.PLAINTEXT
+        self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, use_separate_listener=False)
+
+        self.kafka.start()
+
+        self.create_producer_and_consumer()
+
+        self.run_produce_consume_validate(self.add_separate_broker_listener, SecurityConfig.SASL_SSL,
+                                          SecurityConfig.SASL_MECHANISM_PLAIN)
+
+    @cluster(num_nodes=9)
+    def test_disable_separate_interbroker_listener(self):
+        """
+        Start with a cluster that has two listeners, one on SSL (clients), another on SASL_SSL (broker-to-broker).
+        Start producer and consumer on SSL listener.
+        Close dedicated interbroker listener via rolling restart.
+        Ensure we can produce and consume via SSL listener throughout.
+        """
+        client_protocol = SecurityConfig.SSL
+        client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+
+        self.kafka.security_protocol = client_protocol
+        self.kafka.client_sasl_mechanism = client_sasl_mechanism
+        self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=True)
+        self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+
+        self.kafka.start()
+        # create producer and consumer via client security protocol
+        self.create_producer_and_consumer()
+
+        # run produce/consume/validate loop while disabling a separate interbroker listener via rolling restart
+        self.run_produce_consume_validate(
+            self.remove_separate_broker_listener, client_protocol, client_sasl_mechanism)
+


[kafka] 03/03: MINOR: Support listener config overrides in system tests (#6981)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 5bbe32b174131652e09d7288374d33b3fb35361a
Author: Brian Bushree <bb...@confluent.io>
AuthorDate: Thu Jun 27 10:10:43 2019 -0700

    MINOR:  Support listener config overrides in system tests (#6981)
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 tests/kafkatest/services/kafka/kafka.py            | 26 +++++++---------
 .../services/kafka/templates/kafka.properties      | 18 +++++++++++
 .../services/security/listener_security_config.py  | 36 ++++++++++++++++++++++
 .../kafkatest/services/security/security_config.py | 35 +++++++++++++--------
 4 files changed, 87 insertions(+), 28 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e6e0256..d39ff6a 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -29,6 +29,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.kafka import config_property
 from kafkatest.services.monitor.jmx import JmxMixin
 from kafkatest.services.security.minikdc import MiniKdc
+from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
 
@@ -50,7 +51,6 @@ class KafkaListener:
     def listener_security_protocol(self):
         return "%s:%s" % (self.name, self.security_protocol)
 
-
 class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     PERSISTENT_ROOT = "/mnt/kafka"
     STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log")
@@ -94,7 +94,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
                  jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
-                 use_separate_interbroker_listener=False, per_node_server_prop_overrides=None, extra_kafka_opts=""):
+                 listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None, extra_kafka_opts=""):
         """
         :param context: test context
         :param ZookeeperService zk:
@@ -111,14 +111,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         :param int zk_session_timeout:
         :param dict server_prop_overides: overrides for kafka.properties file
         :param zk_chroot:
-        :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener,
-        with security protocol set to interbroker_security_protocol value. If set, requires
-        interbroker_security_protocol to be provided.
-        Normally port name is the same as its security protocol, so setting security_protocol and
-        interbroker_security_protocol to the same value will lead to a single port being open and both client
-        and broker-to-broker communication will go over that port. This parameter allows
-        you to add an interbroker listener with the same security protocol as a client listener, but running on a
-        separate port.
+        :param ListenerSecurityConfig listener_security_config: listener config to use
         """
         Service.__init__(self, context, num_nodes)
         JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
@@ -143,6 +136,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.log_level = "DEBUG"
         self.zk_chroot = zk_chroot
         self.extra_kafka_opts = extra_kafka_opts
+        self.listener_security_config = listener_security_config
 
         #
         # In a heavily loaded and not very fast machine, it is
@@ -172,7 +166,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         }
 
         self.interbroker_listener = None
-        self.setup_interbroker_listener(interbroker_security_protocol, use_separate_interbroker_listener)
+        self.setup_interbroker_listener(interbroker_security_protocol, self.listener_security_config.use_separate_interbroker_listener)
         self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
 
         for node in self.nodes:
@@ -194,9 +188,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.setup_interbroker_listener(security_protocol, use_separate_listener=False)
 
     def setup_interbroker_listener(self, security_protocol, use_separate_listener=False):
-        self.use_separate_interbroker_listener = use_separate_listener
+        self.listener_security_config.use_separate_interbroker_listener = use_separate_listener
 
-        if self.use_separate_interbroker_listener:
+        if self.listener_security_config.use_separate_interbroker_listener:
             # do not close existing port here since it is not used exclusively for interbroker communication
             self.interbroker_listener = self.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME]
             self.interbroker_listener.security_protocol = security_protocol
@@ -210,7 +204,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol,
                                 zk_sasl=self.zk.zk_sasl,
                                 client_sasl_mechanism=self.client_sasl_mechanism,
-                                interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
+                                interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
+                                listener_security_config=self.listener_security_config)
         for port in self.port_mappings.values():
             if port.open:
                 config.enable_security_protocol(port.security_protocol)
@@ -291,7 +286,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         #load template configs as dictionary
         config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node),
-                                 security_config=self.security_config, num_nodes=self.num_nodes)
+                                 security_config=self.security_config, num_nodes=self.num_nodes,
+                                 listener_security_config=self.listener_security_config)
 
         configs = dict( l.rstrip().split('=', 1) for l in config_template.split('\n')
                         if not l.startswith("#") and "=" in l )
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 11e43be..6060bfa 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -27,6 +27,24 @@ inter.broker.listener.name={{ interbroker_listener.name }}
 security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
 {% endif %}
 
+{% for k, v in listener_security_config.client_listener_overrides.iteritems() %}
+{% if k.startswith('sasl.') %}
+listener.name.{{ security_protocol.lower() }}.{{ security_config.client_sasl_mechanism.lower() }}.{{ k }}={{ v }}
+{% else %}
+listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }}
+{% endif %}
+{% endfor %}
+
+{% if interbroker_listener.name != security_protocol %}
+{% for k, v in listener_security_config.interbroker_listener_overrides.iteritems() %}
+{% if k.startswith('sasl.') %}
+listener.name.{{ interbroker_listener.name.lower() }}.{{ security_config.interbroker_sasl_mechanism.lower() }}.{{ k }}={{ v }}
+{% else %}
+listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }}
+{% endif %}
+{% endfor %}
+{% endif %}
+
 ssl.keystore.location=/mnt/security/test.keystore.jks
 ssl.keystore.password=test-ks-passwd
 ssl.key.password=test-key-passwd
diff --git a/tests/kafkatest/services/security/listener_security_config.py b/tests/kafkatest/services/security/listener_security_config.py
new file mode 100644
index 0000000..74e9e39
--- /dev/null
+++ b/tests/kafkatest/services/security/listener_security_config.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class ListenerSecurityConfig:
+
+    def __init__(self, use_separate_interbroker_listener=False,
+                 client_listener_overrides={}, interbroker_listener_overrides={}):
+        """
+        :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener,
+        with security protocol set to interbroker_security_protocol value. If set, requires
+        interbroker_security_protocol to be provided.
+        Normally port name is the same as its security protocol, so setting security_protocol and
+        interbroker_security_protocol to the same value will lead to a single port being open and both client
+        and broker-to-broker communication will go over that port. This parameter allows
+        you to add an interbroker listener with the same security protocol as a client listener, but running on a
+        separate port.
+        :param dict client_listener_overrides - non-prefixed listener config overrides for named client listener
+        (for example 'sasl.jaas.config', 'ssl.keystore.location', 'sasl.login.callback.handler.class', etc).
+        :param dict interbroker_listener_overrides - non-prefixed listener config overrides for named interbroker
+        listener (for example 'sasl.jaas.config', 'ssl.keystore.location', 'sasl.login.callback.handler.class', etc).
+        """
+        self.use_separate_interbroker_listener = use_separate_interbroker_listener
+        self.client_listener_overrides = client_listener_overrides
+        self.interbroker_listener_overrides = interbroker_listener_overrides
\ No newline at end of file
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index b2fa489..0398557 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -19,6 +19,7 @@ from tempfile import mkdtemp
 from shutil import rmtree
 from ducktape.template import TemplateRenderer
 from kafkatest.services.security.minikdc import MiniKdc
+from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
 import itertools
 
 
@@ -112,7 +113,8 @@ class SecurityConfig(TemplateRenderer):
 
     def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
                  client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
-                 zk_sasl=False, template_props="", static_jaas_conf=True, jaas_override_variables=None):
+                 zk_sasl=False, template_props="", static_jaas_conf=True, jaas_override_variables=None,
+                 listener_security_config=ListenerSecurityConfig()):
         """
         Initialize the security properties for the node and copy
         keystore and truststore to the remote node if the transport protocol 
@@ -144,6 +146,7 @@ class SecurityConfig(TemplateRenderer):
         self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol)
         self.zk_sasl = zk_sasl
         self.static_jaas_conf = static_jaas_conf
+        self.listener_security_config = listener_security_config
         self.properties = {
             'security.protocol' : security_protocol,
             'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
@@ -156,6 +159,7 @@ class SecurityConfig(TemplateRenderer):
             'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
             'sasl.kerberos.service.name' : 'kafka'
         }
+        self.properties.update(self.listener_security_config.client_listener_overrides)
         self.jaas_override_variables = jaas_override_variables or {}
 
     def client_config(self, template_props="", node=None, jaas_override_variables=None):
@@ -169,7 +173,8 @@ class SecurityConfig(TemplateRenderer):
                               client_sasl_mechanism=self.client_sasl_mechanism,
                               template_props=template_props,
                               static_jaas_conf=static_jaas_conf,
-                              jaas_override_variables=jaas_override_variables)
+                              jaas_override_variables=jaas_override_variables,
+                              listener_security_config=self.listener_security_config)
 
     def enable_security_protocol(self, security_protocol):
         self.has_sasl = self.has_sasl or self.is_sasl(security_protocol)
@@ -185,20 +190,24 @@ class SecurityConfig(TemplateRenderer):
         jaas_conf_file = "jaas.conf"
         java_version = node.account.ssh_capture("java -version")
 
-        jaas_conf = self.render_jaas_config(
-            jaas_conf_file,
-            {
-                'node': node,
-                'is_ibm_jdk': any('IBM' in line for line in java_version),
-                'SecurityConfig': SecurityConfig,
-                'client_sasl_mechanism': self.client_sasl_mechanism,
-                'enabled_sasl_mechanisms': self.enabled_sasl_mechanisms
-            }
-        )
+        jaas_conf = None
+        if 'sasl.jaas.config' not in self.properties:
+            jaas_conf = self.render_jaas_config(
+                jaas_conf_file,
+                {
+                    'node': node,
+                    'is_ibm_jdk': any('IBM' in line for line in java_version),
+                    'SecurityConfig': SecurityConfig,
+                    'client_sasl_mechanism': self.client_sasl_mechanism,
+                    'enabled_sasl_mechanisms': self.enabled_sasl_mechanisms
+                }
+            )
+        else:
+            jaas_conf = self.properties['sasl.jaas.config']
 
         if self.static_jaas_conf:
             node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
-        else:
+        elif 'sasl.jaas.config' not in self.properties:
             self.properties['sasl.jaas.config'] = jaas_conf.replace("\n", " \\\n")
         if self.has_sasl_kerberos:
             node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)


[kafka] 02/03: MINOR: Fix failing upgrade test by supporting both security.inter.broker.protocol and inter.broker.listener.name depending on kafka version (#7000)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 14822787d659840b564fc1bf5ce26dd74fba69c7
Author: Stanislav Vodetskyi <49...@users.noreply.github.com>
AuthorDate: Thu Jun 27 09:50:17 2019 -0700

    MINOR: Fix failing upgrade test by supporting both security.inter.broker.protocol and inter.broker.listener.name depending on kafka version (#7000)
    
    Reviewers: Brian Bushree <bb...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
 tests/kafkatest/services/kafka/templates/kafka.properties | 4 ++++
 tests/kafkatest/version.py                                | 3 +++
 2 files changed, 7 insertions(+)

diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 2736e91..11e43be 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -21,7 +21,11 @@ listeners={{ listeners }}
 advertised.listeners={{ advertised_listeners }}
 listener.security.protocol.map={{ listener_security_protocol_map }}
 
+{% if node.version.supports_named_listeners() %}
 inter.broker.listener.name={{ interbroker_listener.name }}
+{% else %}
+security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
+{% endif %}
 
 ssl.keystore.location=/mnt/security/test.keystore.jks
 ssl.keystore.password=test-ks-passwd
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index a3ef1f1..0be4449 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -49,6 +49,9 @@ class KafkaVersion(LooseVersion):
         else:
             return LooseVersion.__str__(self)
 
+    def supports_named_listeners(self):
+        return self >= V_0_10_2_0
+
 
 def get_version(node=None):
     """Return the version attached to the given node.