You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/08/31 16:15:04 UTC
kafka git commit: KAFKA-3799: Enable SSL endpoint validation in
system tests
Repository: kafka
Updated Branches:
refs/heads/trunk 2f20a3987 -> 2e731a9ee
KAFKA-3799: Enable SSL endpoint validation in system tests
Generate certificates with hostname in SubjectAlternativeName and enable hostname validation.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>, Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1483 from rajinisivaram/KAFKA-3799
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e731a9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e731a9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e731a9e
Branch: refs/heads/trunk
Commit: 2e731a9ee002298b4b90f97e9c876b330b005539
Parents: 2f20a39
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Wed Aug 31 09:14:59 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Wed Aug 31 09:14:59 2016 -0700
----------------------------------------------------------------------
.../services/kafka/templates/kafka.properties | 1 +
.../kafkatest/services/kafka_log4j_appender.py | 2 +-
.../services/security/security_config.py | 97 +++++++++++------
tests/kafkatest/tests/core/security_test.py | 106 +++++++++++++++++++
4 files changed, 170 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e731a9e/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 9924aeb..c02c64f 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -58,6 +58,7 @@ ssl.keystore.type=JKS
ssl.truststore.location=/mnt/security/test.truststore.jks
ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS
+ssl.endpoint.identification.algorithm=HTTPS
sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }}
sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
sasl.kerberos.service.name=kafka
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e731a9e/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py
index c50cab4..b25d8be 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -53,7 +53,7 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
cmd += " --security-protocol %s" % str(self.security_protocol)
if self.security_protocol == SecurityConfig.SSL or self.security_protocol == SecurityConfig.SASL_SSL:
cmd += " --ssl-truststore-location %s" % str(SecurityConfig.TRUSTSTORE_PATH)
- cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores['ssl.truststore.password'])
+ cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores.truststore_passwd)
if self.security_protocol == SecurityConfig.SASL_PLAINTEXT or \
self.security_protocol == SecurityConfig.SASL_SSL or \
self.security_protocol == SecurityConfig.SASL_MECHANISM_GSSAPI or \
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e731a9e/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index 59a0ed4..40674e8 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -15,43 +15,67 @@
import os
import subprocess
+from tempfile import mkdtemp
+from shutil import rmtree
from ducktape.template import TemplateRenderer
from kafkatest.services.security.minikdc import MiniKdc
import itertools
-class Keytool(object):
+class SslStores(object):
+ def __init__(self):
+ self.ca_crt_path = "/tmp/test.ca.crt"
+ self.ca_jks_path = "/tmp/test.ca.jks"
+ self.ca_passwd = "test-ca-passwd"
- @staticmethod
- def generate_keystore_truststore(ssl_dir='.'):
+ self.truststore_path = "/tmp/test.truststore.jks"
+ self.truststore_passwd = "test-ts-passwd"
+ self.keystore_passwd = "test-ks-passwd"
+ self.key_passwd = "test-key-passwd"
+
+ for file in [self.ca_crt_path, self.ca_jks_path, self.truststore_path]:
+ if os.path.exists(file):
+ os.remove(file)
+
+ def generate_ca(self):
"""
- Generate JKS keystore and truststore and return
- Kafka SSL properties with these stores.
+ Generate CA private key and certificate.
"""
- ks_path = os.path.join(ssl_dir, 'test.keystore.jks')
- ks_password = 'test-ks-passwd'
- key_password = 'test-key-passwd'
- ts_path = os.path.join(ssl_dir, 'test.truststore.jks')
- ts_password = 'test-ts-passwd'
- if os.path.exists(ks_path):
- os.remove(ks_path)
- if os.path.exists(ts_path):
- os.remove(ts_path)
-
- Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ks_path, key_password, ks_password))
- Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ks_path, ks_password))
- Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (ts_path, ts_password))
- os.remove('test.crt')
-
- return {
- 'ssl.keystore.location' : ks_path,
- 'ssl.keystore.password' : ks_password,
- 'ssl.key.password' : key_password,
- 'ssl.truststore.location' : ts_path,
- 'ssl.truststore.password' : ts_password
- }
- @staticmethod
- def runcmd(cmd):
+ self.runcmd("keytool -genkeypair -alias ca -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -storepass %s -keypass %s -dname CN=SystemTestCA" % (self.ca_jks_path, self.ca_passwd, self.ca_passwd))
+ self.runcmd("keytool -export -alias ca -keystore %s -storepass %s -storetype JKS -rfc -file %s" % (self.ca_jks_path, self.ca_passwd, self.ca_crt_path))
+
+ def generate_truststore(self):
+ """
+ Generate JKS truststore containing CA certificate.
+ """
+
+ self.runcmd("keytool -importcert -alias ca -file %s -keystore %s -storepass %s -storetype JKS -noprompt" % (self.ca_crt_path, self.truststore_path, self.truststore_passwd))
+
+ def generate_and_copy_keystore(self, node):
+ """
+ Generate JKS keystore with certificate signed by the test CA.
+ The generated certificate has the node's hostname as a DNS SubjectAlternativeName.
+ """
+
+ ks_dir = mkdtemp(dir="/tmp")
+ ks_path = os.path.join(ks_dir, "test.keystore.jks")
+ csr_path = os.path.join(ks_dir, "test.kafka.csr")
+ crt_path = os.path.join(ks_dir, "test.kafka.crt")
+
+ self.runcmd("keytool -genkeypair -alias kafka -keyalg RSA -keysize 2048 -keystore %s -storepass %s -keypass %s -dname CN=systemtest -ext SAN=DNS:%s" % (ks_path, self.keystore_passwd, self.key_passwd, self.hostname(node)))
+ self.runcmd("keytool -certreq -keystore %s -storepass %s -keypass %s -alias kafka -file %s" % (ks_path, self.keystore_passwd, self.key_passwd, csr_path))
+ self.runcmd("keytool -gencert -keystore %s -storepass %s -alias ca -infile %s -outfile %s -dname CN=systemtest -ext SAN=DNS:%s" % (self.ca_jks_path, self.ca_passwd, csr_path, crt_path, self.hostname(node)))
+ self.runcmd("keytool -importcert -keystore %s -storepass %s -alias ca -file %s -noprompt" % (ks_path, self.keystore_passwd, self.ca_crt_path))
+ self.runcmd("keytool -importcert -keystore %s -storepass %s -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path))
+ node.account.scp_to(ks_path, SecurityConfig.KEYSTORE_PATH)
+ rmtree(ks_dir)
+
+ def hostname(self, node):
+ """ Hostname which may be overridden for testing validation failures
+ """
+ return node.account.hostname
+
+ def runcmd(self, cmd):
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
proc.communicate()
if proc.returncode != 0:
@@ -73,7 +97,9 @@ class SecurityConfig(TemplateRenderer):
KRB5CONF_PATH = "/mnt/security/krb5.conf"
KEYTAB_PATH = "/mnt/security/keytab"
- ssl_stores = Keytool.generate_keystore_truststore('.')
+ ssl_stores = SslStores()
+ ssl_stores.generate_ca()
+ ssl_stores.generate_truststore()
def __init__(self, security_protocol=None, interbroker_security_protocol=None,
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
@@ -102,10 +128,11 @@ class SecurityConfig(TemplateRenderer):
self.properties = {
'security.protocol' : security_protocol,
'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
- 'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'],
- 'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'],
+ 'ssl.keystore.password' : SecurityConfig.ssl_stores.keystore_passwd,
+ 'ssl.key.password' : SecurityConfig.ssl_stores.key_passwd,
'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH,
- 'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'],
+ 'ssl.truststore.password' : SecurityConfig.ssl_stores.truststore_passwd,
+ 'ssl.endpoint.identification.algorithm' : 'HTTPS',
'sasl.mechanism' : client_sasl_mechanism,
'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
'sasl.kerberos.service.name' : 'kafka'
@@ -117,8 +144,8 @@ class SecurityConfig(TemplateRenderer):
def setup_ssl(self, node):
node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
- node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH)
- node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH)
+ node.account.scp_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH)
+ SecurityConfig.ssl_stores.generate_and_copy_keystore(node)
def setup_sasl(self, node):
node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e731a9e/tests/kafkatest/tests/core/security_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
new file mode 100644
index 0000000..8c150a2
--- /dev/null
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.services.security.security_config import SslStores
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import time
+
+class TestSslStores(SslStores):
+ def __init__(self):
+ super(TestSslStores, self).__init__()
+ self.invalid_hostname = False
+ self.generate_ca()
+ self.generate_truststore()
+
+ def hostname(self, node):
+ if (self.invalid_hostname):
+ return "invalidhost"
+ else:
+ return super(TestSslStores, self).hostname(node)
+
+class SecurityTest(ProduceConsumeValidateTest):
+ """
+ These tests validate security features.
+ """
+
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(SecurityTest, self).__init__(test_context=test_context)
+
+ self.topic = "test_topic"
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
+ "partitions": 2,
+ "replication-factor": 1}
+ })
+ self.num_partitions = 2
+ self.timeout_sec = 10000
+ self.producer_throughput = 1000
+ self.num_producers = 1
+ self.num_consumers = 1
+
+ def setUp(self):
+ self.zk.start()
+
+ @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL')
+ @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
+ def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol):
+ """
+ Test that invalid hostname in certificate results in connection failures.
+ When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
+ When security_protocol=PLAINTEXT and interbroker_security_protocol=SSL, controller connections fail
+ with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
+ """
+
+ self.kafka.security_protocol = security_protocol
+ self.kafka.interbroker_security_protocol = interbroker_security_protocol
+ SecurityConfig.ssl_stores = TestSslStores()
+
+ SecurityConfig.ssl_stores.invalid_hostname = True
+ self.kafka.start()
+ self.create_producer_and_consumer()
+ self.producer.log_level = "TRACE"
+ self.producer.start()
+ self.consumer.start()
+ time.sleep(10)
+ assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname"
+ error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE'
+ for node in self.producer.nodes:
+ node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
+ for node in self.consumer.nodes:
+ node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
+
+ self.producer.stop()
+ self.consumer.stop()
+ self.producer.log_level = "INFO"
+
+ SecurityConfig.ssl_stores.invalid_hostname = False
+ for node in self.kafka.nodes:
+ self.kafka.restart_node(node, clean_shutdown=True)
+ self.create_producer_and_consumer()
+ self.run_produce_consume_validate()
+
+ def create_producer_and_consumer(self):
+ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=True, consumer_timeout_ms=10000, message_validator=is_int)
+