You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/30 23:13:55 UTC
kafka git commit: KAFKA-2771: Added rolling upgrade system test
(ducktape) for Secured Cluster
Repository: kafka
Updated Branches:
refs/heads/trunk a35334908 -> 6b4cc2ea2
KAFKA-2771: Added rolling upgrade system test (ducktape) for Secured Cluster
Tests rolling upgrade from PLAINTEXT to SSL
Author: Ben Stopford <be...@gmail.com>
Reviewers: Geoff Anderson, Ismael Juma
Closes #496 from benstopford/security-upgrade-test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b4cc2ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b4cc2ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b4cc2ea
Branch: refs/heads/trunk
Commit: 6b4cc2ea2b141b25852e2110ac4a400905154b92
Parents: a353349
Author: Ben Stopford <be...@gmail.com>
Authored: Mon Nov 30 14:13:50 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 30 14:13:50 2015 -0800
----------------------------------------------------------------------
tests/kafkatest/services/console_consumer.py | 2 +-
tests/kafkatest/services/kafka/kafka.py | 57 ++++++++-
.../services/kafka/templates/kafka.properties | 16 +--
.../kafkatest/services/kafka_log4j_appender.py | 2 +-
.../performance/consumer_performance.py | 2 +-
.../services/performance/end_to_end_latency.py | 2 +-
.../performance/producer_performance.py | 2 +-
tests/kafkatest/services/verifiable_consumer.py | 5 +-
tests/kafkatest/services/verifiable_producer.py | 2 +-
.../kafkatest/tests/produce_consume_validate.py | 8 +-
.../tests/security_rolling_upgrade_test.py | 124 +++++++++++++++++++
11 files changed, 197 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index e42b20e..b8ad8ab 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -164,7 +164,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
args['jmx_port'] = self.jmx_port
args['kafka_dir'] = kafka_dir(node)
- args['broker_list'] = self.kafka.bootstrap_servers()
+ args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
args['kafka_opts'] = self.security_config.kafka_opts
cmd = "export JMX_PORT=%(jmx_port)s; " \
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 4669a35..809e87f 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -30,6 +30,9 @@ import signal
import subprocess
import time
import os.path
+import collections
+
+Port = collections.namedtuple('Port', ['name', 'number', 'open'])
class KafkaService(JmxMixin, Service):
@@ -73,6 +76,13 @@ class KafkaService(JmxMixin, Service):
self.topics = topics
self.minikdc = None
+ self.port_mappings = {
+ 'PLAINTEXT': Port('PLAINTEXT', 9092, False),
+ 'SSL': Port('SSL', 9093, False),
+ 'SASL_PLAINTEXT': Port('SASL_PLAINTEXT', 9094, False),
+ 'SASL_SSL': Port('SASL_SSL', 9095, False)
+ }
+
for node in self.nodes:
node.version = version
node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)})
@@ -81,11 +91,25 @@ class KafkaService(JmxMixin, Service):
def security_config(self):
return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, sasl_mechanism=self.sasl_mechanism)
- def start(self):
+ def open_port(self, protocol):
+ self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True)
+
+ def close_port(self, protocol):
+ self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=False)
+
+ def start_minikdc(self):
if self.security_config.has_sasl_kerberos:
if self.minikdc is None:
self.minikdc = MiniKdc(self.context, self.nodes)
self.minikdc.start()
+ else:
+ self.minikdc = None
+
+ def start(self):
+ self.open_port(self.security_protocol)
+ self.open_port(self.interbroker_security_protocol)
+
+ self.start_minikdc()
Service.start(self)
# Create topics if necessary
@@ -97,17 +121,32 @@ class KafkaService(JmxMixin, Service):
topic_cfg["topic"] = topic
self.create_topic(topic_cfg)
+ def set_protocol_and_port(self, node):
+ listeners = []
+ advertised_listeners = []
+
+ for protocol in self.port_mappings:
+ port = self.port_mappings[protocol]
+ if port.open:
+ listeners.append(port.name + "://:" + str(port.number))
+ advertised_listeners.append(port.name + "://" + node.account.hostname + ":" + str(port.number))
+
+ self.listeners = ','.join(listeners)
+ self.advertised_listeners = ','.join(advertised_listeners)
+
def prop_file(self, node):
cfg = KafkaConfig(**node.config)
cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
cfg[config_property.ZOOKEEPER_CONNECT] = self.zk.connect_setting()
+ self.set_protocol_and_port(node)
+
# TODO - clean up duplicate configuration logic
prop_file = cfg.render()
prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
- security_config=self.security_config,
- interbroker_security_protocol=self.interbroker_security_protocol,
- sasl_mechanism=self.sasl_mechanism)
+ security_config=self.security_config,
+ interbroker_security_protocol=self.interbroker_security_protocol,
+ sasl_mechanism=self.sasl_mechanism)
return prop_file
def start_cmd(self, node):
@@ -308,9 +347,15 @@ class KafkaService(JmxMixin, Service):
self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
return self.get_node(leader_idx)
- def bootstrap_servers(self):
+ def bootstrap_servers(self, protocol='PLAINTEXT'):
"""Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
This is the format expected by many config files.
"""
- return ','.join([node.account.hostname + ":9092" for node in self.nodes])
+ port_mapping = self.port_mappings[protocol]
+ self.logger.info("Bootstrap client port is: " + str(port_mapping.number))
+
+ if not port_mapping.open:
+ raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping))
+
+ return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes])
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index e938ac8..a2baac1 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -17,13 +17,9 @@
advertised.host.name={{ node.account.hostname }}
-{% if security_protocol == interbroker_security_protocol %}
-listeners={{ security_protocol }}://:9092
-advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:9092
-{% else %}
-listeners={{ security_protocol }}://:9092,{{ interbroker_security_protocol }}://:9093
-advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:9092,{{ interbroker_security_protocol }}://{{ node.account.hostname }}:9093
-{% endif %}
+
+listeners={{ listeners }}
+advertised.listeners={{ advertised_listeners }}
num.network.threads=3
num.io.threads=8
@@ -65,3 +61,9 @@ ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS
sasl.mechanism={{ sasl_mechanism }}
sasl.kerberos.service.name=kafka
+
+{% if replica_lag is defined %}
+replica.lag.time.max.ms={{replica_lag}}
+{% endif %}
+
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index af65eea..0cc39c0 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -45,7 +45,7 @@ class KafkaLog4jAppender(BackgroundThreadService):
def start_cmd(self, node):
cmd = "/opt/%s/bin/" % kafka_dir(node)
cmd += "kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender"
- cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
+ cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol))
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index 4d24628..f8289bc 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -96,7 +96,7 @@ class ConsumerPerformanceService(PerformanceService):
if self.new_consumer:
args['new-consumer'] = ""
- args['broker-list'] = self.kafka.bootstrap_servers()
+ args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
else:
args['zookeeper'] = self.kafka.zk.connect_setting()
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index e7147c8..049eebc 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -49,7 +49,7 @@ class EndToEndLatencyService(PerformanceService):
security_config_file = ""
args.update({
'zk_connect': self.kafka.zk.connect_setting(),
- 'bootstrap_servers': self.kafka.bootstrap_servers(),
+ 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
'security_config_file': security_config_file,
'kafka_dir': kafka_dir(node)
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index b94aab6..7cbc7bb 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -47,7 +47,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
def _worker(self, idx, node):
args = self.args.copy()
args.update({
- 'bootstrap_servers': self.kafka.bootstrap_servers(),
+ 'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
'jmx_port': self.jmx_port,
'client_id': self.client_id,
'kafka_directory': kafka_dir(node)
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 23b0586..51013c0 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -223,9 +223,10 @@ class VerifiableConsumer(BackgroundThreadService):
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \
- " --group-id %s --topic %s --broker-list %s --session-timeout %s %s" % \
- (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout,
+ " --group-id %s --topic %s --broker-list %s --session-timeout %s" % \
+ (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol), self.session_timeout,
"--enable-autocommit" if self.enable_autocommit else "")
+
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index c0dec4d..62c4002 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -118,7 +118,7 @@ class VerifiableProducer(BackgroundThreadService):
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \
- " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
+ " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol))
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
if self.throughput > 0:
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 3d5d565..e01c70f 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -38,7 +38,7 @@ class ProduceConsumeValidateTest(Test):
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10,
err_msg="Producer failed to start in a reasonable amount of time.")
self.consumer.start()
- wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=10,
+ wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=30,
err_msg="Consumer failed to start in a reasonable amount of time.")
def stop_producer_and_consumer(self):
@@ -51,19 +51,19 @@ class ProduceConsumeValidateTest(Test):
# Check that producer is still successfully producing
currently_acked = self.producer.num_acked
- wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=10,
+ wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=30,
err_msg="Expected producer to still be producing.")
self.producer.stop()
self.consumer.wait()
- def run_produce_consume_validate(self, core_test_action=None):
+ def run_produce_consume_validate(self, core_test_action=None, *args):
"""Top-level template for simple produce/consume/validate tests."""
self.start_producer_and_consumer()
if core_test_action is not None:
- core_test_action()
+ core_test_action(*args)
self.stop_producer_and_consumer()
self.validate()
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/tests/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security_rolling_upgrade_test.py
new file mode 100644
index 0000000..279cd26
--- /dev/null
+++ b/tests/kafkatest/tests/security_rolling_upgrade_test.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from ducktape.mark import matrix
+import time
+import random
+
+
+class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
+ """Tests a rolling upgrade from PLAINTEXT to a secured cluster
+ """
+
+ def __init__(self, test_context):
+ super(TestSecurityRollingUpgrade, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.producer_throughput = 100
+ self.num_producers = 1
+ self.num_consumers = 1
+ self.zk = ZookeeperService(self.test_context, num_nodes=1)
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+ "partitions": 3,
+ "replication-factor": 3,
+ "min.insync.replicas": 2}})
+ self.zk.start()
+
+ #reduce replica.lag.time.max.ms due to KAFKA-2827
+ self.kafka.replica_lag = 2000
+
+ def create_producer_and_consumer(self):
+ self.producer = VerifiableProducer(
+ self.test_context, self.num_producers, self.kafka, self.topic,
+ throughput=self.producer_throughput)
+
+ self.consumer = ConsoleConsumer(
+ self.test_context, self.num_consumers, self.kafka, self.topic,
+ consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
+
+ self.consumer.group_id = "unique-test-group-" + str(random.random())
+
+ def bounce(self):
+ #Sleeps reduce the intermittent failures reported in KAFKA-2891. Should be removed once resolved.
+ for node in self.kafka.nodes:
+ self.kafka.stop_node(node)
+ time.sleep(10)
+ self.kafka.start_node(node)
+ time.sleep(10)
+
+ def roll_in_secured_settings(self, upgrade_protocol):
+ self.kafka.interbroker_security_protocol = upgrade_protocol
+
+ # Roll cluster to include inter broker security protocol.
+ self.kafka.open_port(upgrade_protocol)
+ self.bounce()
+
+ # Roll cluster to disable PLAINTEXT port
+ self.kafka.close_port('PLAINTEXT')
+ self.bounce()
+
+ def open_secured_port(self, upgrade_protocol):
+ self.kafka.security_protocol = upgrade_protocol
+ self.kafka.open_port(upgrade_protocol)
+ self.kafka.start_minikdc()
+ self.bounce()
+
+ @matrix(upgrade_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+ def test_rolling_upgrade_phase_one(self, upgrade_protocol):
+ """
+ Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
+ and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port.
+ """
+ self.kafka.interbroker_security_protocol = "PLAINTEXT"
+ self.kafka.security_protocol = "PLAINTEXT"
+ self.kafka.start()
+
+ #Create PLAINTEXT producer and consumer
+ self.create_producer_and_consumer()
+
+ # Rolling upgrade, opening a secure protocol, ensuring the Plaintext producer/consumer continues to run
+ self.run_produce_consume_validate(self.open_secured_port, upgrade_protocol)
+
+ # Now we can produce and consume via the secured port
+ self.kafka.security_protocol = upgrade_protocol
+ self.create_producer_and_consumer()
+ self.run_produce_consume_validate(lambda: time.sleep(1))
+
+ @matrix(upgrade_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+ def test_rolling_upgrade_phase_two(self, upgrade_protocol):
+ """
+ Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
+ Start an Producer and Consumer via the SECURED port
+ Rolling upgrade to add inter-broker be the secure protocol
+ Rolling upgrade again to disable PLAINTEXT
+ Ensure the producer and consumer ran throughout
+ """
+ #Given we have a broker that has both secure and PLAINTEXT ports open
+ self.kafka.security_protocol = upgrade_protocol
+ self.kafka.interbroker_security_protocol = "PLAINTEXT"
+ self.kafka.start()
+
+ #Create Secured Producer and Consumer
+ self.create_producer_and_consumer()
+
+ #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
+ self.run_produce_consume_validate(self.roll_in_secured_settings, upgrade_protocol)