You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/05/27 00:25:20 UTC
[kafka] 01/03: KAFKA-8557: system tests - add support for
(optional) interbroker listener with the same security protocol as client
listeners (#6938)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 6de76f6fdf725bab7d6aa52893ba840e6518a7a4
Author: Stanislav Vodetskyi <49...@users.noreply.github.com>
AuthorDate: Fri Jun 21 09:51:43 2019 -0700
KAFKA-8557: system tests - add support for (optional) interbroker listener with the same security protocol as client listeners (#6938)
Reviewers: Brian Bushree <bb...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
tests/kafkatest/services/kafka/kafka.py | 137 ++++++++++++++++-----
.../services/kafka/templates/kafka.properties | 3 +-
.../tests/core/security_rolling_upgrade_test.py | 96 +++++++++++----
3 files changed, 181 insertions(+), 55 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index fe972b9..e6e0256 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -32,7 +32,24 @@ from kafkatest.services.security.minikdc import MiniKdc
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
-Port = collections.namedtuple('Port', ['name', 'number', 'open'])
+
+class KafkaListener:
+
+ def __init__(self, name, port_number, security_protocol, open=False):
+ self.name = name
+ self.port_number = port_number
+ self.security_protocol = security_protocol
+ self.open = open
+
+ def listener(self):
+ return "%s://:%s" % (self.name, str(self.port_number))
+
+ def advertised_listener(self, node):
+ return "%s://%s:%s" % (self.name, node.account.hostname, str(self.port_number))
+
+ def listener_security_protocol(self):
+ return "%s:%s" % (self.name, self.security_protocol)
+
class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
PERSISTENT_ROOT = "/mnt/kafka"
@@ -50,6 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
# Kafka Authorizer
SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
+ INTERBROKER_LISTENER_NAME = 'INTERNAL'
logs = {
"kafka_server_start_stdout_stderr": {
@@ -76,11 +94,31 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
- per_node_server_prop_overrides=None, extra_kafka_opts=""):
+ use_separate_interbroker_listener=False, per_node_server_prop_overrides=None, extra_kafka_opts=""):
"""
- :type context
- :type zk: ZookeeperService
- :type topics: dict
+ :param context: test context
+ :param ZookeeperService zk:
+ :param dict topics: which topics to create automatically
+ :param str security_protocol: security protocol for clients to use
+ :param str interbroker_security_protocol: security protocol to use for broker-to-broker communication
+ :param str client_sasl_mechanism: sasl mechanism for clients to use
+ :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication
+ :param str authorizer_class_name: which authorizer class to use
+ :param str version: which kafka version to use. Defaults to "dev" branch
+ :param jmx_object_names:
+ :param jmx_attributes:
+ :param int zk_connect_timeout:
+ :param int zk_session_timeout:
+ :param dict server_prop_overides: overrides for kafka.properties file
+ :param zk_chroot:
+ :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener,
+ with security protocol set to interbroker_security_protocol value. If set, requires
+ interbroker_security_protocol to be provided.
+ Normally port name is the same as its security protocol, so setting security_protocol and
+ interbroker_security_protocol to the same value will lead to a single port being open and both client
+ and broker-to-broker communication will go over that port. This parameter allows
+ you to add an interbroker listener with the same security protocol as a client listener, but running on a
+ separate port.
"""
Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
@@ -89,9 +127,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.zk = zk
self.security_protocol = security_protocol
- self.interbroker_security_protocol = interbroker_security_protocol
self.client_sasl_mechanism = client_sasl_mechanism
- self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
self.topics = topics
self.minikdc = None
self.authorizer_class_name = authorizer_class_name
@@ -127,37 +163,64 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.zk_session_timeout = zk_session_timeout
self.port_mappings = {
- 'PLAINTEXT': Port('PLAINTEXT', 9092, False),
- 'SSL': Port('SSL', 9093, False),
- 'SASL_PLAINTEXT': Port('SASL_PLAINTEXT', 9094, False),
- 'SASL_SSL': Port('SASL_SSL', 9095, False)
+ 'PLAINTEXT': KafkaListener('PLAINTEXT', 9092, 'PLAINTEXT', False),
+ 'SSL': KafkaListener('SSL', 9093, 'SSL', False),
+ 'SASL_PLAINTEXT': KafkaListener('SASL_PLAINTEXT', 9094, 'SASL_PLAINTEXT', False),
+ 'SASL_SSL': KafkaListener('SASL_SSL', 9095, 'SASL_SSL', False),
+ KafkaService.INTERBROKER_LISTENER_NAME:
+ KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, 9099, None, False)
}
+ self.interbroker_listener = None
+ self.setup_interbroker_listener(interbroker_security_protocol, use_separate_interbroker_listener)
+ self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
+
for node in self.nodes:
node.version = version
node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)})
-
def set_version(self, version):
for node in self.nodes:
node.version = version
@property
+ def interbroker_security_protocol(self):
+ return self.interbroker_listener.security_protocol
+
+ # this is required for backwards compatibility - there are a lot of tests that set this property explicitly
+ # meaning 'use one of the existing listeners that match given security protocol, do not use custom listener'
+ @interbroker_security_protocol.setter
+ def interbroker_security_protocol(self, security_protocol):
+ self.setup_interbroker_listener(security_protocol, use_separate_listener=False)
+
+ def setup_interbroker_listener(self, security_protocol, use_separate_listener=False):
+ self.use_separate_interbroker_listener = use_separate_listener
+
+ if self.use_separate_interbroker_listener:
+ # do not close existing port here since it is not used exclusively for interbroker communication
+ self.interbroker_listener = self.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME]
+ self.interbroker_listener.security_protocol = security_protocol
+ else:
+ # close dedicated interbroker port, so it's not dangling in 'listeners' and 'advertised.listeners'
+ self.close_port(KafkaService.INTERBROKER_LISTENER_NAME)
+ self.interbroker_listener = self.port_mappings[security_protocol]
+
+ @property
def security_config(self):
- config = SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol,
- zk_sasl=self.zk.zk_sasl,
- client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
- for protocol in self.port_mappings:
- port = self.port_mappings[protocol]
+ config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol,
+ zk_sasl=self.zk.zk_sasl,
+ client_sasl_mechanism=self.client_sasl_mechanism,
+ interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
+ for port in self.port_mappings.values():
if port.open:
- config.enable_security_protocol(port.name)
+ config.enable_security_protocol(port.security_protocol)
return config
- def open_port(self, protocol):
- self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True)
+ def open_port(self, listener_name):
+ self.port_mappings[listener_name].open = True
- def close_port(self, protocol):
- self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=False)
+ def close_port(self, listener_name):
+ self.port_mappings[listener_name].open = False
def start_minikdc(self, add_principals=""):
if self.security_config.has_sasl:
@@ -172,7 +235,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def start(self, add_principals=""):
self.open_port(self.security_protocol)
- self.open_port(self.interbroker_security_protocol)
+ self.interbroker_listener.open = True
self.start_minikdc(add_principals)
self._ensure_zk_chroot()
@@ -210,15 +273,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def set_protocol_and_port(self, node):
listeners = []
advertised_listeners = []
+ protocol_map = []
- for protocol in self.port_mappings:
- port = self.port_mappings[protocol]
+ for port in self.port_mappings.values():
if port.open:
- listeners.append(port.name + "://:" + str(port.number))
- advertised_listeners.append(port.name + "://" + node.account.hostname + ":" + str(port.number))
+ listeners.append(port.listener())
+ advertised_listeners.append(port.advertised_listener(node))
+ protocol_map.append(port.listener_security_protocol())
self.listeners = ','.join(listeners)
self.advertised_listeners = ','.join(advertised_listeners)
+ self.listener_security_protocol_map = ','.join(protocol_map)
+ self.interbroker_bootstrap_servers = self.__bootstrap_servers(self.interbroker_listener, True)
def prop_file(self, node):
self.set_protocol_and_port(node)
@@ -685,18 +751,23 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def zk_connect_setting(self):
return self.zk.connect_setting(self.zk_chroot)
+ def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
+ if validate and not port.open:
+ raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " %
+ str(port.port_number))
+
+ return ','.join([node.account.hostname + ":" + str(port.port_number)
+ for node in self.nodes
+ if node not in offline_nodes])
+
def bootstrap_servers(self, protocol='PLAINTEXT', validate=True, offline_nodes=[]):
"""Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
This is the format expected by many config files.
"""
port_mapping = self.port_mappings[protocol]
- self.logger.info("Bootstrap client port is: " + str(port_mapping.number))
-
- if validate and not port_mapping.open:
- raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping))
-
- return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes if node not in offline_nodes])
+ self.logger.info("Bootstrap client port is: " + str(port_mapping.port_number))
+ return self.__bootstrap_servers(port_mapping, validate, offline_nodes)
def controller(self):
""" Get the controller node
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 4362978..2736e91 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -19,8 +19,9 @@ advertised.host.name={{ node.account.hostname }}
listeners={{ listeners }}
advertised.listeners={{ advertised_listeners }}
+listener.security.protocol.map={{ listener_security_protocol_map }}
-security.inter.broker.protocol={{ security_config.interbroker_security_protocol }}
+inter.broker.listener.name={{ interbroker_listener.name }}
ssl.keystore.location=/mnt/security/test.keystore.jks
ssl.keystore.password=test-ks-passwd
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index ba014ea..a64363c 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -12,15 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
+from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.utils import is_int
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from ducktape.mark import parametrize, matrix
+from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.services.security.kafka_acls import ACLs
import time
@@ -66,13 +65,12 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
time.sleep(10)
def roll_in_secured_settings(self, client_protocol, broker_protocol):
-
# Roll cluster to include inter broker security protocol.
- self.kafka.interbroker_security_protocol = broker_protocol
+ self.kafka.setup_interbroker_listener(broker_protocol)
self.bounce()
# Roll cluster to disable PLAINTEXT port
- self.kafka.close_port('PLAINTEXT')
+ self.kafka.close_port(SecurityConfig.PLAINTEXT)
self.set_authorizer_and_bounce(client_protocol, broker_protocol)
def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
@@ -100,17 +98,31 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
# Bounce again with ACLs for new mechanism
self.set_authorizer_and_bounce(security_protocol, security_protocol)
+ def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism):
+ self.kafka.setup_interbroker_listener(broker_security_protocol, True)
+ self.kafka.interbroker_sasl_mechanism = broker_sasl_mechanism
+ # kafka opens interbroker port automatically in start() but not in bounce()
+ self.kafka.open_port(self.kafka.INTERBROKER_LISTENER_NAME)
+ self.bounce()
+
+ def remove_separate_broker_listener(self, client_security_protocol, client_sasl_mechanism):
+ # separate interbroker listener port will be closed automatically in setup_interbroker_listener
+ # if not using separate interbroker listener
+ self.kafka.setup_interbroker_listener(client_security_protocol, False)
+ self.kafka.interbroker_sasl_mechanism = client_sasl_mechanism
+ self.bounce()
+
@cluster(num_nodes=8)
- @matrix(client_protocol=["SSL"])
+ @matrix(client_protocol=[SecurityConfig.SSL])
@cluster(num_nodes=9)
- @matrix(client_protocol=["SASL_PLAINTEXT", "SASL_SSL"])
+ @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL])
def test_rolling_upgrade_phase_one(self, client_protocol):
"""
Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port.
"""
- self.kafka.interbroker_security_protocol = "PLAINTEXT"
- self.kafka.security_protocol = "PLAINTEXT"
+ self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT)
+ self.kafka.security_protocol = SecurityConfig.PLAINTEXT
self.kafka.start()
# Create PLAINTEXT producer and consumer
@@ -125,7 +137,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.run_produce_consume_validate(lambda: time.sleep(1))
@cluster(num_nodes=8)
- @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
+ @matrix(client_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT],
+ broker_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT])
def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
"""
Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
@@ -137,7 +150,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
"""
#Given we have a broker that has both secure and PLAINTEXT ports open
self.kafka.security_protocol = client_protocol
- self.kafka.interbroker_security_protocol = "PLAINTEXT"
+ self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, use_separate_listener=False)
self.kafka.open_port(broker_protocol)
self.kafka.start()
@@ -148,16 +161,16 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)
@cluster(num_nodes=9)
- @parametrize(new_client_sasl_mechanism='PLAIN')
+ @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
"""
Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a rolling upgrade, ensuring we could produce
and consume throughout over SASL/GSSAPI. Finally check we can produce and consume using new mechanism.
"""
- self.kafka.interbroker_security_protocol = "SASL_SSL"
- self.kafka.security_protocol = "SASL_SSL"
- self.kafka.client_sasl_mechanism = "GSSAPI"
- self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+ self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=False)
+ self.kafka.security_protocol = SecurityConfig.SASL_SSL
+ self.kafka.client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+ self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
self.kafka.start()
# Create SASL/GSSAPI producer and consumer
@@ -172,7 +185,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.run_produce_consume_validate(lambda: time.sleep(1))
@cluster(num_nodes=8)
- @parametrize(new_sasl_mechanism='PLAIN')
+ @matrix(new_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
"""
Start with a SASL cluster with GSSAPI for inter-broker and a second mechanism for clients (i.e. result of phase one).
@@ -182,10 +195,10 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
Ensure the producer and consumer run throughout
"""
#Start with a broker that has GSSAPI for inter-broker and a second mechanism for clients
- self.kafka.security_protocol = "SASL_SSL"
- self.kafka.interbroker_security_protocol = "SASL_SSL"
+ self.kafka.security_protocol = SecurityConfig.SASL_SSL
+ self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=False)
self.kafka.client_sasl_mechanism = new_sasl_mechanism
- self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+ self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
self.kafka.start()
#Create Producer and Consumer using second mechanism
@@ -194,3 +207,44 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
#Roll in the second SASL mechanism for inter-broker, disabling first mechanism. Ensure we can produce and consume throughout
self.run_produce_consume_validate(self.roll_in_sasl_mechanism, self.kafka.security_protocol, new_sasl_mechanism)
+ @cluster(num_nodes=9)
+ def test_enable_separate_interbroker_listener(self):
+ """
+ Start with a cluster that has a single PLAINTEXT listener.
+ Start producing/consuming on PLAINTEXT port.
+ While doing that, do a rolling restart to enable separate secured interbroker port
+ """
+ self.kafka.security_protocol = SecurityConfig.PLAINTEXT
+ self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, use_separate_listener=False)
+
+ self.kafka.start()
+
+ self.create_producer_and_consumer()
+
+ self.run_produce_consume_validate(self.add_separate_broker_listener, SecurityConfig.SASL_SSL,
+ SecurityConfig.SASL_MECHANISM_PLAIN)
+
+ @cluster(num_nodes=9)
+ def test_disable_separate_interbroker_listener(self):
+ """
+ Start with a cluster that has two listeners, one on SSL (clients), another on SASL_SSL (broker-to-broker).
+ Start producer and consumer on SSL listener.
+ Close dedicated interbroker listener via rolling restart.
+ Ensure we can produce and consume via SSL listener throughout.
+ """
+ client_protocol = SecurityConfig.SSL
+ client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+
+ self.kafka.security_protocol = client_protocol
+ self.kafka.client_sasl_mechanism = client_sasl_mechanism
+ self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=True)
+ self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+
+ self.kafka.start()
+ # create producer and consumer via client security protocol
+ self.create_producer_and_consumer()
+
+ # run produce/consume/validate loop while disabling a separate interbroker listener via rolling restart
+ self.run_produce_consume_validate(
+ self.remove_separate_broker_listener, client_protocol, client_sasl_mechanism)
+