You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/30 23:13:55 UTC

kafka git commit: KAFKA-2771: Added rolling upgrade system test (ducktape) for Secured Cluster

Repository: kafka
Updated Branches:
  refs/heads/trunk a35334908 -> 6b4cc2ea2


KAFKA-2771: Added rolling upgrade system test (ducktape) for Secured Cluster

Tests rolling upgrade from PLAINTEXT to SSL

Author: Ben Stopford <be...@gmail.com>

Reviewers: Geoff Anderson, Ismael Juma

Closes #496 from benstopford/security-upgrade-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b4cc2ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b4cc2ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b4cc2ea

Branch: refs/heads/trunk
Commit: 6b4cc2ea2b141b25852e2110ac4a400905154b92
Parents: a353349
Author: Ben Stopford <be...@gmail.com>
Authored: Mon Nov 30 14:13:50 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 30 14:13:50 2015 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/console_consumer.py    |   2 +-
 tests/kafkatest/services/kafka/kafka.py         |  57 ++++++++-
 .../services/kafka/templates/kafka.properties   |  16 +--
 .../kafkatest/services/kafka_log4j_appender.py  |   2 +-
 .../performance/consumer_performance.py         |   2 +-
 .../services/performance/end_to_end_latency.py  |   2 +-
 .../performance/producer_performance.py         |   2 +-
 tests/kafkatest/services/verifiable_consumer.py |   5 +-
 tests/kafkatest/services/verifiable_producer.py |   2 +-
 .../kafkatest/tests/produce_consume_validate.py |   8 +-
 .../tests/security_rolling_upgrade_test.py      | 124 +++++++++++++++++++
 11 files changed, 197 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index e42b20e..b8ad8ab 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -164,7 +164,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
         args['jmx_port'] = self.jmx_port
         args['kafka_dir'] = kafka_dir(node)
-        args['broker_list'] = self.kafka.bootstrap_servers()
+        args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
         args['kafka_opts'] = self.security_config.kafka_opts
 
         cmd = "export JMX_PORT=%(jmx_port)s; " \

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 4669a35..809e87f 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -30,6 +30,9 @@ import signal
 import subprocess
 import time
 import os.path
+import collections
+
+Port = collections.namedtuple('Port', ['name', 'number', 'open'])
 
 class KafkaService(JmxMixin, Service):
 
@@ -73,6 +76,13 @@ class KafkaService(JmxMixin, Service):
         self.topics = topics
         self.minikdc = None
 
+        self.port_mappings = {
+            'PLAINTEXT': Port('PLAINTEXT', 9092, False),
+            'SSL': Port('SSL', 9093, False),
+            'SASL_PLAINTEXT': Port('SASL_PLAINTEXT', 9094, False),
+            'SASL_SSL': Port('SASL_SSL', 9095, False)
+        }
+
         for node in self.nodes:
             node.version = version
             node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)})
@@ -81,11 +91,25 @@ class KafkaService(JmxMixin, Service):
     def security_config(self):
         return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, sasl_mechanism=self.sasl_mechanism)
 
-    def start(self):
+    def open_port(self, protocol):
+        self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True)
+
+    def close_port(self, protocol):
+        self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=False)
+
+    def start_minikdc(self):
         if self.security_config.has_sasl_kerberos:
             if self.minikdc is None:
                 self.minikdc = MiniKdc(self.context, self.nodes)
                 self.minikdc.start()
+        else:
+            self.minikdc = None
+
+    def start(self):
+        self.open_port(self.security_protocol)
+        self.open_port(self.interbroker_security_protocol)
+
+        self.start_minikdc()
         Service.start(self)
 
         # Create topics if necessary
@@ -97,17 +121,32 @@ class KafkaService(JmxMixin, Service):
                 topic_cfg["topic"] = topic
                 self.create_topic(topic_cfg)
 
+    def set_protocol_and_port(self, node):
+        listeners = []
+        advertised_listeners = []
+
+        for protocol in self.port_mappings:
+            port = self.port_mappings[protocol]
+            if port.open:
+                listeners.append(port.name + "://:" + str(port.number))
+                advertised_listeners.append(port.name + "://" +  node.account.hostname + ":" + str(port.number))
+
+        self.listeners = ','.join(listeners)
+        self.advertised_listeners = ','.join(advertised_listeners)
+
     def prop_file(self, node):
         cfg = KafkaConfig(**node.config)
         cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
         cfg[config_property.ZOOKEEPER_CONNECT] = self.zk.connect_setting()
 
+        self.set_protocol_and_port(node)
+
         # TODO - clean up duplicate configuration logic
         prop_file = cfg.render()
         prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
-                                  security_config=self.security_config, 
-                                  interbroker_security_protocol=self.interbroker_security_protocol,
-                                  sasl_mechanism=self.sasl_mechanism)
+                                 security_config=self.security_config,
+                                 interbroker_security_protocol=self.interbroker_security_protocol,
+                                 sasl_mechanism=self.sasl_mechanism)
         return prop_file
 
     def start_cmd(self, node):
@@ -308,9 +347,15 @@ class KafkaService(JmxMixin, Service):
         self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
         return self.get_node(leader_idx)
 
-    def bootstrap_servers(self):
+    def bootstrap_servers(self, protocol='PLAINTEXT'):
         """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
 
         This is the format expected by many config files.
         """
-        return ','.join([node.account.hostname + ":9092" for node in self.nodes])
+        port_mapping = self.port_mappings[protocol]
+        self.logger.info("Bootstrap client port is: " + str(port_mapping.number))
+
+        if not port_mapping.open:
+            raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping))
+
+        return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index e938ac8..a2baac1 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -17,13 +17,9 @@
 
 advertised.host.name={{ node.account.hostname }}
 
-{% if security_protocol == interbroker_security_protocol %}
-listeners={{ security_protocol }}://:9092
-advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:9092
-{% else %}
-listeners={{ security_protocol }}://:9092,{{ interbroker_security_protocol }}://:9093
-advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:9092,{{ interbroker_security_protocol }}://{{ node.account.hostname }}:9093
-{% endif %}
+
+listeners={{ listeners }}
+advertised.listeners={{ advertised_listeners }}
 
 num.network.threads=3
 num.io.threads=8
@@ -65,3 +61,9 @@ ssl.truststore.password=test-ts-passwd
 ssl.truststore.type=JKS
 sasl.mechanism={{ sasl_mechanism }}
 sasl.kerberos.service.name=kafka
+
+{% if replica_lag is defined %}
+replica.lag.time.max.ms={{replica_lag}}
+{% endif %}
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index af65eea..0cc39c0 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -45,7 +45,7 @@ class KafkaLog4jAppender(BackgroundThreadService):
     def start_cmd(self, node):
         cmd = "/opt/%s/bin/" % kafka_dir(node)
         cmd += "kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender"
-        cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
+        cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol))
 
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/performance/consumer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
index 4d24628..f8289bc 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -96,7 +96,7 @@ class ConsumerPerformanceService(PerformanceService):
 
         if self.new_consumer:
             args['new-consumer'] = ""
-            args['broker-list'] = self.kafka.bootstrap_servers()
+            args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
         else:
             args['zookeeper'] = self.kafka.zk.connect_setting()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/performance/end_to_end_latency.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
index e7147c8..049eebc 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -49,7 +49,7 @@ class EndToEndLatencyService(PerformanceService):
             security_config_file = ""
         args.update({
             'zk_connect': self.kafka.zk.connect_setting(),
-            'bootstrap_servers': self.kafka.bootstrap_servers(),
+            'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
             'security_config_file': security_config_file,
             'kafka_dir': kafka_dir(node)
         })

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
index b94aab6..7cbc7bb 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -47,7 +47,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
     def _worker(self, idx, node):
         args = self.args.copy()
         args.update({
-            'bootstrap_servers': self.kafka.bootstrap_servers(),
+            'bootstrap_servers': self.kafka.bootstrap_servers(self.security_config.security_protocol),
             'jmx_port': self.jmx_port,
             'client_id': self.client_id,
             'kafka_directory': kafka_dir(node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 23b0586..51013c0 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -223,9 +223,10 @@ class VerifiableConsumer(BackgroundThreadService):
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
         cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \
-              " --group-id %s --topic %s --broker-list %s --session-timeout %s %s" % \
-              (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout,
+              " --group-id %s --topic %s --broker-list %s --session-timeout %s" % \
+              (self.group_id, self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol), self.session_timeout,
                "--enable-autocommit" if self.enable_autocommit else "")
+
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index c0dec4d..62c4002 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -118,7 +118,7 @@ class VerifiableProducer(BackgroundThreadService):
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
         cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \
-              " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
+              " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol))
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
         if self.throughput > 0:

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 3d5d565..e01c70f 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -38,7 +38,7 @@ class ProduceConsumeValidateTest(Test):
         wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10,
              err_msg="Producer failed to start in a reasonable amount of time.")
         self.consumer.start()
-        wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=10,
+        wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=30,
              err_msg="Consumer failed to start in a reasonable amount of time.")
 
     def stop_producer_and_consumer(self):
@@ -51,19 +51,19 @@ class ProduceConsumeValidateTest(Test):
 
         # Check that producer is still successfully producing
         currently_acked = self.producer.num_acked
-        wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=10,
+        wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=30,
              err_msg="Expected producer to still be producing.")
 
         self.producer.stop()
         self.consumer.wait()
 
-    def run_produce_consume_validate(self, core_test_action=None):
+    def run_produce_consume_validate(self, core_test_action=None, *args):
         """Top-level template for simple produce/consume/validate tests."""
 
         self.start_producer_and_consumer()
 
         if core_test_action is not None:
-            core_test_action()
+            core_test_action(*args)
 
         self.stop_producer_and_consumer()
         self.validate()

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b4cc2ea/tests/kafkatest/tests/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security_rolling_upgrade_test.py
new file mode 100644
index 0000000..279cd26
--- /dev/null
+++ b/tests/kafkatest/tests/security_rolling_upgrade_test.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer, is_int
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from ducktape.mark import matrix
+import time
+import random
+
+
+class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
+    """Tests a rolling upgrade from PLAINTEXT to a secured cluster
+    """
+
+    def __init__(self, test_context):
+        super(TestSecurityRollingUpgrade, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.producer_throughput = 100
+        self.num_producers = 1
+        self.num_consumers = 1
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
+            "partitions": 3,
+            "replication-factor": 3,
+            "min.insync.replicas": 2}})
+        self.zk.start()
+
+        #reduce replica.lag.time.max.ms due to KAFKA-2827
+        self.kafka.replica_lag = 2000
+
+    def create_producer_and_consumer(self):
+        self.producer = VerifiableProducer(
+            self.test_context, self.num_producers, self.kafka, self.topic,
+            throughput=self.producer_throughput)
+
+        self.consumer = ConsoleConsumer(
+            self.test_context, self.num_consumers, self.kafka, self.topic,
+            consumer_timeout_ms=60000, message_validator=is_int, new_consumer=True)
+
+        self.consumer.group_id = "unique-test-group-" + str(random.random())
+
+    def bounce(self):
+        #Sleeps reduce the intermittent failures reported in KAFKA-2891. Should be removed once resolved.
+        for node in self.kafka.nodes:
+            self.kafka.stop_node(node)
+            time.sleep(10)
+            self.kafka.start_node(node)
+            time.sleep(10)
+
+    def roll_in_secured_settings(self, upgrade_protocol):
+        self.kafka.interbroker_security_protocol = upgrade_protocol
+
+        # Roll cluster to include inter broker security protocol.
+        self.kafka.open_port(upgrade_protocol)
+        self.bounce()
+
+        # Roll cluster to disable PLAINTEXT port
+        self.kafka.close_port('PLAINTEXT')
+        self.bounce()
+
+    def open_secured_port(self, upgrade_protocol):
+        self.kafka.security_protocol = upgrade_protocol
+        self.kafka.open_port(upgrade_protocol)
+        self.kafka.start_minikdc()
+        self.bounce()
+
+    @matrix(upgrade_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+    def test_rolling_upgrade_phase_one(self, upgrade_protocol):
+        """
+        Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce
+        and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port.
+        """
+        self.kafka.interbroker_security_protocol = "PLAINTEXT"
+        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.start()
+
+        #Create PLAINTEXT producer and consumer
+        self.create_producer_and_consumer()
+
+        # Rolling upgrade, opening a secure protocol, ensuring the Plaintext producer/consumer continues to run
+        self.run_produce_consume_validate(self.open_secured_port, upgrade_protocol)
+
+        # Now we can produce and consume via the secured port
+        self.kafka.security_protocol = upgrade_protocol
+        self.create_producer_and_consumer()
+        self.run_produce_consume_validate(lambda: time.sleep(1))
+
+    @matrix(upgrade_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+    def test_rolling_upgrade_phase_two(self, upgrade_protocol):
+        """
+        Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one).
+        Start an Producer and Consumer via the SECURED port
+        Rolling upgrade to add inter-broker be the secure protocol
+        Rolling upgrade again to disable PLAINTEXT
+        Ensure the producer and consumer ran throughout
+        """
+        #Given we have a broker that has both secure and PLAINTEXT ports open
+        self.kafka.security_protocol = upgrade_protocol
+        self.kafka.interbroker_security_protocol = "PLAINTEXT"
+        self.kafka.start()
+
+        #Create Secured Producer and Consumer
+        self.create_producer_and_consumer()
+
+        #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
+        self.run_produce_consume_validate(self.roll_in_secured_settings, upgrade_protocol)