You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/08/31 16:15:04 UTC

kafka git commit: KAFKA-3799: Enable SSL endpoint validation in system tests

Repository: kafka
Updated Branches:
  refs/heads/trunk 2f20a3987 -> 2e731a9ee


KAFKA-3799: Enable SSL endpoint validation in system tests

Generate certificates with hostname in SubjectAlternativeName and enable hostname validation.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>, Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1483 from rajinisivaram/KAFKA-3799


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e731a9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e731a9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e731a9e

Branch: refs/heads/trunk
Commit: 2e731a9ee002298b4b90f97e9c876b330b005539
Parents: 2f20a39
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Aug 31 09:14:59 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Aug 31 09:14:59 2016 -0700

----------------------------------------------------------------------
 .../services/kafka/templates/kafka.properties   |   1 +
 .../kafkatest/services/kafka_log4j_appender.py  |   2 +-
 .../services/security/security_config.py        |  97 +++++++++++------
 tests/kafkatest/tests/core/security_test.py     | 106 +++++++++++++++++++
 4 files changed, 170 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e731a9e/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 9924aeb..c02c64f 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -58,6 +58,7 @@ ssl.keystore.type=JKS
 ssl.truststore.location=/mnt/security/test.truststore.jks
 ssl.truststore.password=test-ts-passwd
 ssl.truststore.type=JKS
+ssl.endpoint.identification.algorithm=HTTPS
 sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }}
 sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
 sasl.kerberos.service.name=kafka

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e731a9e/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index c50cab4..b25d8be 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -53,7 +53,7 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
             cmd += " --security-protocol %s" % str(self.security_protocol)
         if self.security_protocol == SecurityConfig.SSL or self.security_protocol == SecurityConfig.SASL_SSL:
             cmd += " --ssl-truststore-location %s" % str(SecurityConfig.TRUSTSTORE_PATH)
-            cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores['ssl.truststore.password'])
+            cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores.truststore_passwd)
         if self.security_protocol == SecurityConfig.SASL_PLAINTEXT or \
                 self.security_protocol == SecurityConfig.SASL_SSL or \
                 self.security_protocol == SecurityConfig.SASL_MECHANISM_GSSAPI or \

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e731a9e/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index 59a0ed4..40674e8 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -15,43 +15,67 @@
 
 import os
 import subprocess
+from tempfile import mkdtemp
+from shutil import rmtree
 from ducktape.template import TemplateRenderer
 from kafkatest.services.security.minikdc import MiniKdc
 import itertools
 
-class Keytool(object):
+class SslStores(object):
+    def __init__(self):
+        self.ca_crt_path = "/tmp/test.ca.crt"
+        self.ca_jks_path = "/tmp/test.ca.jks"
+        self.ca_passwd = "test-ca-passwd"
 
-    @staticmethod
-    def generate_keystore_truststore(ssl_dir='.'):
+        self.truststore_path = "/tmp/test.truststore.jks"
+        self.truststore_passwd = "test-ts-passwd"
+        self.keystore_passwd = "test-ks-passwd"
+        self.key_passwd = "test-key-passwd"
+
+        for file in [self.ca_crt_path, self.ca_jks_path, self.truststore_path]:
+            if os.path.exists(file):
+                os.remove(file)
+
+    def generate_ca(self):
         """
-        Generate JKS keystore and truststore and return
-        Kafka SSL properties with these stores.
+        Generate CA private key and certificate.
         """
-        ks_path = os.path.join(ssl_dir, 'test.keystore.jks')
-        ks_password = 'test-ks-passwd'
-        key_password = 'test-key-passwd'
-        ts_path = os.path.join(ssl_dir, 'test.truststore.jks')
-        ts_password = 'test-ts-passwd'
-        if os.path.exists(ks_path):
-            os.remove(ks_path)
-        if os.path.exists(ts_path):
-            os.remove(ts_path)
-        
-        Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ks_path, key_password, ks_password))
-        Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ks_path, ks_password))
-        Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (ts_path, ts_password))
-        os.remove('test.crt')
-
-        return {
-            'ssl.keystore.location' : ks_path,
-            'ssl.keystore.password' : ks_password,
-            'ssl.key.password' : key_password,
-            'ssl.truststore.location' : ts_path,
-            'ssl.truststore.password' : ts_password
-        }
 
-    @staticmethod
-    def runcmd(cmd):
+        self.runcmd("keytool -genkeypair -alias ca -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -storepass %s -keypass %s -dname CN=SystemTestCA" % (self.ca_jks_path, self.ca_passwd, self.ca_passwd))
+        self.runcmd("keytool -export -alias ca -keystore %s -storepass %s -storetype JKS -rfc -file %s" % (self.ca_jks_path, self.ca_passwd, self.ca_crt_path))
+
+    def generate_truststore(self):
+        """
+        Generate JKS truststore containing CA certificate.
+        """
+
+        self.runcmd("keytool -importcert -alias ca -file %s -keystore %s -storepass %s -storetype JKS -noprompt" % (self.ca_crt_path, self.truststore_path, self.truststore_passwd))
+
+    def generate_and_copy_keystore(self, node):
+        """
+        Generate JKS keystore with certificate signed by the test CA.
+        The generated certificate has the node's hostname as a DNS SubjectAlternativeName.
+        """
+
+        ks_dir = mkdtemp(dir="/tmp")
+        ks_path = os.path.join(ks_dir, "test.keystore.jks")
+        csr_path = os.path.join(ks_dir, "test.kafka.csr")
+        crt_path = os.path.join(ks_dir, "test.kafka.crt")
+
+	self.runcmd("keytool -genkeypair -alias kafka -keyalg RSA -keysize 2048 -keystore %s -storepass %s -keypass %s -dname CN=systemtest -ext SAN=DNS:%s" % (ks_path, self.keystore_passwd, self.key_passwd, self.hostname(node)))
+	self.runcmd("keytool -certreq -keystore %s -storepass %s -keypass %s -alias kafka -file %s" % (ks_path, self.keystore_passwd, self.key_passwd, csr_path))
+	self.runcmd("keytool -gencert -keystore %s -storepass %s -alias ca -infile %s -outfile %s -dname CN=systemtest -ext SAN=DNS:%s" % (self.ca_jks_path, self.ca_passwd, csr_path, crt_path, self.hostname(node)))
+	self.runcmd("keytool -importcert -keystore %s -storepass %s -alias ca -file %s -noprompt" % (ks_path, self.keystore_passwd, self.ca_crt_path))
+	self.runcmd("keytool -importcert -keystore %s -storepass %s -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path))
+        node.account.scp_to(ks_path, SecurityConfig.KEYSTORE_PATH)
+        rmtree(ks_dir)
+
+    def hostname(self, node):
+        """ Hostname which may be overridden for testing validation failures
+        """
+        return node.account.hostname
+
+    def runcmd(self, cmd):
         proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
         proc.communicate()
         if proc.returncode != 0:
@@ -73,7 +97,9 @@ class SecurityConfig(TemplateRenderer):
     KRB5CONF_PATH = "/mnt/security/krb5.conf"
     KEYTAB_PATH = "/mnt/security/keytab"
 
-    ssl_stores = Keytool.generate_keystore_truststore('.')
+    ssl_stores = SslStores()
+    ssl_stores.generate_ca()
+    ssl_stores.generate_truststore()
 
     def __init__(self, security_protocol=None, interbroker_security_protocol=None,
                  client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
@@ -102,10 +128,11 @@ class SecurityConfig(TemplateRenderer):
         self.properties = {
             'security.protocol' : security_protocol,
             'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
-            'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'],
-            'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'],
+            'ssl.keystore.password' : SecurityConfig.ssl_stores.keystore_passwd,
+            'ssl.key.password' : SecurityConfig.ssl_stores.key_passwd,
             'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH,
-            'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'],
+            'ssl.truststore.password' : SecurityConfig.ssl_stores.truststore_passwd,
+            'ssl.endpoint.identification.algorithm' : 'HTTPS',
             'sasl.mechanism' : client_sasl_mechanism,
             'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
             'sasl.kerberos.service.name' : 'kafka'
@@ -117,8 +144,8 @@ class SecurityConfig(TemplateRenderer):
 
     def setup_ssl(self, node):
         node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
-        node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH)
-        node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH)
+        node.account.scp_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH)
+        SecurityConfig.ssl_stores.generate_and_copy_keystore(node)
 
     def setup_sasl(self, node):
         node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e731a9e/tests/kafkatest/tests/core/security_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
new file mode 100644
index 0000000..8c150a2
--- /dev/null
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SslStores
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import time
+
+class TestSslStores(SslStores):
+    def __init__(self):
+        super(TestSslStores, self).__init__()
+        self.invalid_hostname = False
+        self.generate_ca()
+        self.generate_truststore()
+
+    def hostname(self, node):
+        if (self.invalid_hostname):
+            return "invalidhost"
+        else:
+            return super(TestSslStores, self).hostname(node)
+
+class SecurityTest(ProduceConsumeValidateTest):
+    """
+    These tests validate security features.
+    """
+
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(SecurityTest, self).__init__(test_context=test_context)
+
+        self.topic = "test_topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
+                                                                    "partitions": 2,
+                                                                    "replication-factor": 1}
+                                                                })
+        self.num_partitions = 2
+        self.timeout_sec = 10000
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def setUp(self):
+        self.zk.start()
+
+    @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL')
+    @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+    def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol):
+        """
+        Test that invalid hostname in certificate results in connection failures.
+        When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
+        When security_protocol=PLAINTEXT and interbroker_security_protocol=SSL, controller connections fail
+        with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
+        """
+
+        self.kafka.security_protocol = security_protocol
+        self.kafka.interbroker_security_protocol = interbroker_security_protocol
+        SecurityConfig.ssl_stores = TestSslStores()
+
+        SecurityConfig.ssl_stores.invalid_hostname = True
+        self.kafka.start()
+        self.create_producer_and_consumer()
+        self.producer.log_level = "TRACE"
+        self.producer.start()
+        self.consumer.start()
+        time.sleep(10)
+        assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname"
+        error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE'
+        for node in self.producer.nodes:
+            node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
+        for node in self.consumer.nodes:
+            node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
+
+        self.producer.stop()
+        self.consumer.stop()
+        self.producer.log_level = "INFO"
+
+        SecurityConfig.ssl_stores.invalid_hostname = False
+        for node in self.kafka.nodes:
+            self.kafka.restart_node(node, clean_shutdown=True)
+        self.create_producer_and_consumer()
+        self.run_produce_consume_validate()
+
+    def create_producer_and_consumer(self):
+        self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=True, consumer_timeout_ms=10000, message_validator=is_int)
+