You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/27 00:25:20 UTC

[kafka] 01/03: KAFKA-8557: system tests - add support for (optional) interbroker listener with the same security protocol as client listeners (#6938)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 6de76f6fdf725bab7d6aa52893ba840e6518a7a4
Author: Stanislav Vodetskyi <49...@users.noreply.github.com>
AuthorDate: Fri Jun 21 09:51:43 2019 -0700

    KAFKA-8557: system tests - add support for (optional) interbroker listener with the same security protocol as client listeners (#6938)
    
    Reviewers: Brian Bushree <bb...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
 tests/kafkatest/services/kafka/kafka.py            | 137 ++++++++++++++++-----
 .../services/kafka/templates/kafka.properties      |   3 +-
 .../tests/core/security_rolling_upgrade_test.py    |  96 +++++++++++----
 3 files changed, 181 insertions(+), 55 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index fe972b9..e6e0256 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -32,7 +32,24 @@ from kafkatest.services.security.minikdc import MiniKdc
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
 
-Port = collections.namedtuple('Port', ['name', 'number', 'open'])
+
+class KafkaListener:
+
+    def __init__(self, name, port_number, security_protocol, open=False):
+        self.name = name
+        self.port_number = port_number
+        self.security_protocol = security_protocol
+        self.open = open
+
+    def listener(self):
+        return "%s://:%s" % (self.name, str(self.port_number))
+
+    def advertised_listener(self, node):
+        return "%s://%s:%s" % (self.name, node.account.hostname, str(self.port_number))
+
+    def listener_security_protocol(self):
+        return "%s:%s" % (self.name, self.security_protocol)
+
 
 class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     PERSISTENT_ROOT = "/mnt/kafka"
@@ -50,6 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     # Kafka Authorizer
     SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
+    INTERBROKER_LISTENER_NAME = 'INTERNAL'
 
     logs = {
         "kafka_server_start_stdout_stderr": {
@@ -76,11 +94,31 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
                  jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
-                 per_node_server_prop_overrides=None, extra_kafka_opts=""):
+                 use_separate_interbroker_listener=False, per_node_server_prop_overrides=None, extra_kafka_opts=""):
         """
-        :type context
-        :type zk: ZookeeperService
-        :type topics: dict
+        :param context: test context
+        :param ZookeeperService zk:
+        :param dict topics: which topics to create automatically
+        :param str security_protocol: security protocol for clients to use
+        :param str interbroker_security_protocol: security protocol to use for broker-to-broker communication
+        :param str client_sasl_mechanism: sasl mechanism for clients to use
+        :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication
+        :param str authorizer_class_name: which authorizer class to use
+        :param str version: which kafka version to use. Defaults to "dev" branch
+        :param jmx_object_names:
+        :param jmx_attributes:
+        :param int zk_connect_timeout:
+        :param int zk_session_timeout:
+        :param dict server_prop_overides: overrides for kafka.properties file
+        :param zk_chroot:
+        :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener,
+        with security protocol set to interbroker_security_protocol value. If set, requires
+        interbroker_security_protocol to be provided.
+        Normally port name is the same as its security protocol, so setting security_protocol and
+        interbroker_security_protocol to the same value will lead to a single port being open and both client
+        and broker-to-broker communication will go over that port. This parameter allows
+        you to add an interbroker listener with the same security protocol as a client listener, but running on a
+        separate port.
         """
         Service.__init__(self, context, num_nodes)
         JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
@@ -89,9 +127,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.zk = zk
 
         self.security_protocol = security_protocol
-        self.interbroker_security_protocol = interbroker_security_protocol
         self.client_sasl_mechanism = client_sasl_mechanism
-        self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
         self.topics = topics
         self.minikdc = None
         self.authorizer_class_name = authorizer_class_name
@@ -127,37 +163,64 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.zk_session_timeout = zk_session_timeout
 
         self.port_mappings = {
-            'PLAINTEXT': Port('PLAINTEXT', 9092, False),
-            'SSL': Port('SSL', 9093, False),
-            'SASL_PLAINTEXT': Port('SASL_PLAINTEXT', 9094, False),
-            'SASL_SSL': Port('SASL_SSL', 9095, False)
+            'PLAINTEXT': KafkaListener('PLAINTEXT', 9092, 'PLAINTEXT', False),
+            'SSL': KafkaListener('SSL', 9093, 'SSL', False),
+            'SASL_PLAINTEXT': KafkaListener('SASL_PLAINTEXT', 9094, 'SASL_PLAINTEXT', False),
+            'SASL_SSL': KafkaListener('SASL_SSL', 9095, 'SASL_SSL', False),
+            KafkaService.INTERBROKER_LISTENER_NAME:
+                KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, 9099, None, False)
         }
 
+        self.interbroker_listener = None
+        self.setup_interbroker_listener(interbroker_security_protocol, use_separate_interbroker_listener)
+        self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
+
         for node in self.nodes:
             node.version = version
             node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)})
 
-
     def set_version(self, version):
         for node in self.nodes:
             node.version = version
 
     @property
+    def interbroker_security_protocol(self):
+        return self.interbroker_listener.security_protocol
+
+    # this is required for backwards compatibility - there are a lot of tests that set this property explicitly
+    # meaning 'use one of the existing listeners that match given security protocol, do not use custom listener'
+    @interbroker_security_protocol.setter
+    def interbroker_security_protocol(self, security_protocol):
+        self.setup_interbroker_listener(security_protocol, use_separate_listener=False)
+
+    def setup_interbroker_listener(self, security_protocol, use_separate_listener=False):
+        self.use_separate_interbroker_listener = use_separate_listener
+
+        if self.use_separate_interbroker_listener:
+            # do not close existing port here since it is not used exclusively for interbroker communication
+            self.interbroker_listener = self.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME]
+            self.interbroker_listener.security_protocol = security_protocol
+        else:
+            # close dedicated interbroker port, so it's not dangling in 'listeners' and 'advertised.listeners'
+            self.close_port(KafkaService.INTERBROKER_LISTENER_NAME)
+            self.interbroker_listener = self.port_mappings[security_protocol]
+
+    @property
     def security_config(self):
-        config = SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,
-                              zk_sasl=self.zk.zk_sasl,
-                              client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
-        for protocol in self.port_mappings:
-            port = self.port_mappings[protocol]
+        config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol,
+                                zk_sasl=self.zk.zk_sasl,
+                                client_sasl_mechanism=self.client_sasl_mechanism,
+                                interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
+        for port in self.port_mappings.values():
             if port.open:
-                config.enable_security_protocol(port.name)
+                config.enable_security_protocol(port.security_protocol)
         return config
 
-    def open_port(self, protocol):
-        self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True)
+    def open_port(self, listener_name):
+        self.port_mappings[listener_name].open = True
 
-    def close_port(self, protocol):
-        self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=False)
+    def close_port(self, listener_name):
+        self.port_mappings[listener_name].open = False
 
     def start_minikdc(self, add_principals=""):
         if self.security_config.has_sasl:
@@ -172,7 +235,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
     def start(self, add_principals=""):
         self.open_port(self.security_protocol)
-        self.open_port(self.interbroker_security_protocol)
+        self.interbroker_listener.open = True
 
         self.start_minikdc(add_principals)
         self._ensure_zk_chroot()
@@ -210,15 +273,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def set_protocol_and_port(self, node):
         listeners = []
         advertised_listeners = []
+        protocol_map = []
 
-        for protocol in self.port_mappings:
-            port = self.port_mappings[protocol]
+        for port in self.port_mappings.values():
             if port.open:
-                listeners.append(port.name + "://:" + str(port.number))
-                advertised_listeners.append(port.name + "://" +  node.account.hostname + ":" + str(port.number))
+                listeners.append(port.listener())
+                advertised_listeners.append(port.advertised_listener(node))
+                protocol_map.append(port.listener_security_protocol())
 
         self.listeners = ','.join(listeners)
         self.advertised_listeners = ','.join(advertised_listeners)
+        self.listener_security_protocol_map = ','.join(protocol_map)
+        self.interbroker_bootstrap_servers = self.__bootstrap_servers(self.interbroker_listener, True)
 
     def prop_file(self, node):
         self.set_protocol_and_port(node)
@@ -685,18 +751,23 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def zk_connect_setting(self):
         return self.zk.connect_setting(self.zk_chroot)
 
+    def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
+        if validate and not port.open:
+            raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " %
+                             str(port.port_number))
+
+        return ','.join([node.account.hostname + ":" + str(port.port_number)
+                         for node in self.nodes
+                         if node not in offline_nodes])
+
     def bootstrap_servers(self, protocol='PLAINTEXT', validate=True, offline_nodes=[]):
         """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
 
         This is the format expected by many config files.
         """
         port_mapping = self.port_mappings[protocol]
-        self.logger.info("Bootstrap client port is: " + str(port_mapping.number))
-
-        if validate and not port_mapping.open:
-            raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping))
-
-        return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes if node not in offline_nodes])
+        self.logger.info("Bootstrap client port is: " + str(port_mapping.port_number))
+        return self.__bootstrap_servers(port_mapping, validate, offline_nodes)
 
     def controller(self):
         """ Get the controller node
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 4362978..2736e91 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -19,8 +19,9 @@ advertised.host.name={{ node.account.hostname }}
 
 listeners={{ listeners }}
 advertised.listeners={{ advertised_listeners }}
+listener.security.protocol.map={{ listener_security_protocol_map }}
 
-security.inter.broker.protocol={{ security_config.interbroker_security_protocol }}
+inter.broker.listener.name={{ interbroker_listener.name }}
 
 ssl.keystore.location=/mnt/security/test.keystore.jks
 ssl.keystore.password=test-ks-passwd
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index ba014ea..a64363c 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -12,15 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.utils import is_int
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from ducktape.mark import parametrize, matrix
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from kafkatest.services.security.kafka_acls import ACLs
 import time
@@ -66,13 +65,12 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
             time.sleep(10)
 
     def roll_in_secured_settings(self, client_protocol, broker_protocol):
-
         # Roll cluster to include inter broker security protocol.
-        self.kafka.interbroker_security_protocol = broker_protocol
+        self.kafka.setup_interbroker_listener(broker_protocol)
         self.bounce()
 
         # Roll cluster to disable PLAINTEXT port
-        self.kafka.close_port('PLAINTEXT')
+        self.kafka.close_port(SecurityConfig.PLAINTEXT)
         self.set_authorizer_and_bounce(client_protocol, broker_protocol)
 
     def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
@@ -100,17 +98,31 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         # Bounce again with ACLs for new mechanism
         self.set_authorizer_and_bounce(security_protocol, security_protocol)
 
+    def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism):
+        self.kafka.setup_interbroker_listener(broker_security_protocol, True)
+        self.kafka.interbroker_sasl_mechanism = broker_sasl_mechanism
+        # kafka opens interbroker port automatically in start() but not in bounce()
+        self.kafka.open_port(self.kafka.INTERBROKER_LISTENER_NAME)
+        self.bounce()
+
+    def remove_separate_broker_listener(self, client_security_protocol, client_sasl_mechanism):
+        # separate interbroker listener port will be closed automatically in setup_interbroker_listener
+        # if not using separate interbroker listener
+        self.kafka.setup_interbroker_listener(client_security_protocol, False)
+        self.kafka.interbroker_sasl_mechanism = client_sasl_mechanism
+        self.bounce()
+
     @cluster(num_nodes=8)
-    @matrix(client_protocol=["SSL"])
+    @matrix(client_protocol=[SecurityConfig.SSL])
     @cluster(num_nodes=9)
-    @matrix(client_protocol=["SASL_PLAINTEXT", "SASL_SSL"])
+    @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL])
     def test_rolling_upgrade_phase_one(self, client_protocol):
         """
         Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
         and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port.
         """
-        self.kafka.interbroker_security_protocol = "PLAINTEXT"
-        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT)
+        self.kafka.security_protocol = SecurityConfig.PLAINTEXT
         self.kafka.start()
 
         # Create PLAINTEXT producer and consumer
@@ -125,7 +137,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
     @cluster(num_nodes=8)
-    @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
+    @matrix(client_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT],
+            broker_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT])
     def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
         """
         Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
@@ -137,7 +150,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         """
         #Given we have a broker that has both secure and PLAINTEXT ports open
         self.kafka.security_protocol = client_protocol
-        self.kafka.interbroker_security_protocol = "PLAINTEXT"
+        self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, use_separate_listener=False)
         self.kafka.open_port(broker_protocol)
         self.kafka.start()
 
@@ -148,16 +161,16 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
 
     @cluster(num_nodes=9)
-    @parametrize(new_client_sasl_mechanism='PLAIN')
+    @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
     def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
         """
         Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a rolling upgrade, ensuring we could produce
         and consume throughout over SASL/GSSAPI. Finally check we can produce and consume using new mechanism.
         """
-        self.kafka.interbroker_security_protocol = "SASL_SSL"
-        self.kafka.security_protocol = "SASL_SSL"
-        self.kafka.client_sasl_mechanism = "GSSAPI"
-        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+        self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=False)
+        self.kafka.security_protocol = SecurityConfig.SASL_SSL
+        self.kafka.client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+        self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
         self.kafka.start()
 
         # Create SASL/GSSAPI producer and consumer
@@ -172,7 +185,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
     @cluster(num_nodes=8)
-    @parametrize(new_sasl_mechanism='PLAIN')
+    @matrix(new_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
     def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
         """
         Start with a SASL cluster with GSSAPI for inter-broker and a second mechanism for clients (i.e. result of phase one).
@@ -182,10 +195,10 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         Ensure the producer and consumer run throughout
         """
         #Start with a broker that has GSSAPI for inter-broker and a second mechanism for clients
-        self.kafka.security_protocol = "SASL_SSL"
-        self.kafka.interbroker_security_protocol = "SASL_SSL"
+        self.kafka.security_protocol = SecurityConfig.SASL_SSL
+        self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=False)
         self.kafka.client_sasl_mechanism = new_sasl_mechanism
-        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+        self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
         self.kafka.start()
 
         #Create Producer and Consumer using second mechanism
@@ -194,3 +207,44 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         #Roll in the second SASL mechanism for inter-broker, disabling first mechanism. Ensure we can produce and consume throughout
         self.run_produce_consume_validate(self.roll_in_sasl_mechanism, self.kafka.security_protocol, new_sasl_mechanism)
 
+    @cluster(num_nodes=9)
+    def test_enable_separate_interbroker_listener(self):
+        """
+        Start with a cluster that has a single PLAINTEXT listener.
+        Start producing/consuming on PLAINTEXT port.
+        While doing that, do a rolling restart to enable separate secured interbroker port
+        """
+        self.kafka.security_protocol = SecurityConfig.PLAINTEXT
+        self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, use_separate_listener=False)
+
+        self.kafka.start()
+
+        self.create_producer_and_consumer()
+
+        self.run_produce_consume_validate(self.add_separate_broker_listener, SecurityConfig.SASL_SSL,
+                                          SecurityConfig.SASL_MECHANISM_PLAIN)
+
+    @cluster(num_nodes=9)
+    def test_disable_separate_interbroker_listener(self):
+        """
+        Start with a cluster that has two listeners, one on SSL (clients), another on SASL_SSL (broker-to-broker).
+        Start producer and consumer on SSL listener.
+        Close dedicated interbroker listener via rolling restart.
+        Ensure we can produce and consume via SSL listener throughout.
+        """
+        client_protocol = SecurityConfig.SSL
+        client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+
+        self.kafka.security_protocol = client_protocol
+        self.kafka.client_sasl_mechanism = client_sasl_mechanism
+        self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=True)
+        self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+
+        self.kafka.start()
+        # create producer and consumer via client security protocol
+        self.create_producer_and_consumer()
+
+        # run produce/consume/validate loop while disabling a separate interbroker listener via rolling restart
+        self.run_produce_consume_validate(
+            self.remove_separate_broker_listener, client_protocol, client_sasl_mechanism)
+