You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2020/02/25 14:31:45 UTC
[kafka] branch 2.5 updated: KAFKA-9567: Docs,
system tests for ZooKeeper 3.5.7
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new edf8809 KAFKA-9567: Docs, system tests for ZooKeeper 3.5.7
edf8809 is described below
commit edf88095612e3fc21fbd58e36be1f7c8b9241599
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Tue Feb 25 19:59:55 2020 +0530
KAFKA-9567: Docs, system tests for ZooKeeper 3.5.7
These changes depend on [KIP-515: Enable ZK client to use the new TLS supported authentication](https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication), which was only added to 2.5.0. The upgrade to ZooKeeper 3.5.7 was merged to both 2.5.0 and 2.4.1 via https://issues.apache.org/jira/browse/KAFKA-9515, but this change must only be merged to 2.5.0 (it will break the system tests if merged to 2.4.1).
Author: Ron Dagostino <rd...@confluent.io>
Reviewers: Manikumar Reddy <ma...@gmail.com>, Andrew Choi <li...@microsoft.com>
Closes #8132 from rondagostino/KAFKA-9567
(cherry picked from commit 9d53ad794de45739093070306f44df2a2f31e9e4)
Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
docs/security.html | 43 ++++++----
docs/toc.html | 1 +
tests/kafkatest/services/kafka/kafka.py | 92 +++++++++++++++------
.../services/kafka/templates/kafka.properties | 2 +
.../kafkatest/services/security/security_config.py | 9 +++
.../services/templates/zookeeper.properties | 3 +
tests/kafkatest/services/zookeeper.py | 21 +++--
.../tests/core/zookeeper_tls_encrypt_only_test.py | 93 ++++++++++++++++++++++
tests/kafkatest/version.py | 2 +
9 files changed, 219 insertions(+), 47 deletions(-)
diff --git a/docs/security.html b/docs/security.html
index f9fca78..4f515e0 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1890,14 +1890,6 @@
It is only when using mTLS authentication alone that all the DNs must match (and SANs become critical --
again, in the absence of writing and deploying a custom ZooKeeper authentication provider as described below).
</p>
- Note that ZooKeeper version 3.5.6 <strong>requires</strong> clients that connect via TLS to present their own certificate
- (i.e. encryption and mTLS authentication are linked rather than independent of each other in this ZooKeeper version).
- There is a ZooKeeper sever-side config <tt>ssl.clientAuth</tt> that is recognized
- (case-insensitively: <tt>want</tt>/<tt>need</tt>/<tt>none</tt> are the valid options),
- but this config <a href="https://issues.apache.org/jira/browse/ZOOKEEPER-3674">is not acted upon in version 3.5.6</a>.
- A future version of ZooKeeper will both recognize and act upon this config and allow clients to connect via a TLS-encrypted connection
- without presenting their own certificate -- i.e. encryption and authentication will become independent of each other.
- But for now this is not the case.
</p>
<p>
Use the broker properties file to set TLS configs for brokers as described below.
@@ -1935,7 +1927,7 @@
</p>
Here is a sample (partial) ZooKeeper configuration for enabling TLS authentication.
These configurations are described in the
- <a href="https://zookeeper.apache.org/doc/r3.5.6/zookeeperAdmin.html#sc_authOptions">ZooKeeper Admin Guide</a>.
+ <a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#sc_authOptions">ZooKeeper Admin Guide</a>.
<pre>
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
@@ -1949,8 +1941,9 @@
to a value different from the keystore password itself.
Be sure to set the key password to be the same as the keystore password.
- Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication.
- These configuration are described above in <a href="#brokerconfigs">Broker Configs</a>.
+ <p>Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication.
+ These configurations are described above in <a href="#brokerconfigs">Broker Configs</a>.
+ </p>
<pre>
# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
@@ -2009,12 +2002,34 @@
<h4><a id="zk_authz_ensemble" href="#zk_authz_ensemble">7.6.3 Migrating the ZooKeeper ensemble</a></h4>
It is also necessary to enable SASL and/or mTLS authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. See above for mTLS information. Please refer to the ZooKeeper documentation for more detail:
<ol>
- <li><a href="http://zookeeper.apache.org/doc/r3.5.6/zookeeperProgrammers.html#sc_ZooKeeperAccessControl">Apache ZooKeeper documentation</a></li>
+ <li><a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperProgrammers.html#sc_ZooKeeperAccessControl">Apache ZooKeeper documentation</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zookeeper+and+SASL">Apache ZooKeeper wiki</a></li>
</ol>
- <h4><a id="zk_authz_quorum" href="#zk_authz_quorum">7.6.4 ZooKeeper Quorum Mutial TLS Authentication</a></h4>
+ <h4><a id="zk_authz_quorum" href="#zk_authz_quorum">7.6.4 ZooKeeper Quorum Mutual TLS Authentication</a></h4>
It is possible to enable mTLS authentication between the ZooKeeper servers themselves.
- Please refer to the <a href="https://zookeeper.apache.org/doc/r3.5.6/zookeeperAdmin.html#Quorum+TLS">ZooKeeper documentation</a> for more detail.
+ Please refer to the <a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#Quorum+TLS">ZooKeeper documentation</a> for more detail.
+
+ <h3><a id="zk_encryption" href="#zk_encryption">7.7 ZooKeeper Encryption</a></h3>
+ ZooKeeper connections that use mutual TLS are encrypted.
+ Beginning with ZooKeeper version 3.5.7 (the version shipped with Kafka version 2.5) ZooKeeper supports a sever-side config
+ <tt>ssl.clientAuth</tt> (case-insensitively: <tt>want</tt>/<tt>need</tt>/<tt>none</tt> are the valid options, the default is <tt>need</tt>),
+ and setting this value to <tt>none</tt> in ZooKeeper allows clients to connect via a TLS-encrypted connection
+ without presenting their own certificate. Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with just TLS encryption.
+ These configurations are described above in <a href="#brokerconfigs">Broker Configs</a>.
+ <pre>
+ # connect to the ZooKeeper port configured for TLS
+ zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
+ # required to use TLS to ZooKeeper (default is false)
+ zookeeper.ssl.client.enable=true
+ # required to use TLS to ZooKeeper
+ zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
+ # define trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
+ # no need to set keystore information assuming ssl.clientAuth=none on ZooKeeper
+ zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
+ zookeeper.ssl.truststore.password=kafka-ts-passwd
+ # tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
+ zookeeper.set.acl=true
+ </pre>
</script>
<div class="p-security"></div>
diff --git a/docs/toc.html b/docs/toc.html
index 11c4424..b8c977a 100644
--- a/docs/toc.html
+++ b/docs/toc.html
@@ -135,6 +135,7 @@
<li><a href="#zk_authz_ensemble">Migrating the ZooKeeper Ensemble</a></li>
<li><a href="#zk_authz_quorum">ZooKeeper Quorum Mutual TLS Authentication</a></li>
</ul>
+ <li><a href="#zk_encryption">7.7 ZooKeeper Encryption</a></li>
</ul>
</li>
<li><a href="#connect">8. Kafka Connect</a>
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e59431e..1c3d161 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -70,6 +70,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
INTERBROKER_LISTENER_NAME = 'INTERNAL'
+ JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
+ KRB5_CONF = "java.security.krb5.conf=/mnt/security/krb5.conf"
logs = {
"kafka_server_start_stdout_stderr": {
@@ -235,8 +237,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def alive(self, node):
return len(self.pids(node)) > 0
- def start(self, add_principals=""):
- if self.zk_client_secure and not self.zk.client_secure_port:
+ def start(self, add_principals="", use_zk_to_create_topic=True):
+ if self.zk_client_secure and not self.zk.zk_client_secure_port:
raise Exception("Unable to start Kafka: TLS to Zookeeper requested but Zookeeper secure port not enabled")
self.open_port(self.security_protocol)
self.interbroker_listener.open = True
@@ -262,7 +264,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
topic_cfg = {}
topic_cfg["topic"] = topic
- self.create_topic(topic_cfg)
+ self.create_topic(topic_cfg, use_zk_to_create_topic=use_zk_to_create_topic)
def _ensure_zk_chroot(self):
self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
@@ -295,8 +297,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
#load template configs as dictionary
config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node),
- security_config=self.security_config, num_nodes=self.num_nodes,
- listener_security_config=self.listener_security_config)
+ security_config=self.security_config, num_nodes=self.num_nodes,
+ listener_security_config=self.listener_security_config)
configs = dict( l.rstrip().split('=', 1) for l in config_template.split('\n')
if not l.startswith("#") and "=" in l )
@@ -421,7 +423,25 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
clean_shutdown=False, allow_fail=True)
node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False)
- def create_topic(self, topic_cfg, node=None):
+ def _kafka_topics_cmd(self, node, use_zk_connection=True):
+ """
+ Returns kafka-topics.sh command path with jaas configuration and krb5 environment variable
+ set. If Admin client is not going to be used, don't set the environment variable.
+ """
+ kafka_topic_script = self.path.script("kafka-topics.sh", node)
+ skip_security_settings = use_zk_connection or not node.version.topic_command_supports_bootstrap_server()
+ return kafka_topic_script if skip_security_settings else \
+ "KAFKA_OPTS='-D%s -D%s' %s" % (KafkaService.JAAS_CONF_PROPERTY, KafkaService.KRB5_CONF, kafka_topic_script)
+
+ def _kafka_topics_cmd_config(self, node, use_zk_connection=True):
+ """
+ Return --command-config parameter to the kafka-topics.sh command. The config parameter specifies
+ the security settings that AdminClient uses to connect to a secure kafka server.
+ """
+ skip_command_config = use_zk_connection or not node.version.topic_command_supports_bootstrap_server()
+ return "" if skip_command_config else " --command-config <(echo '%s')" % (self.security_config.client_config())
+
+ def create_topic(self, topic_cfg, node=None, use_zk_to_create_topic=True):
"""Run the admin tool create topic command.
Specifying node is optional, and may be done if for different kafka nodes have different versions,
and we care where command gets run.
@@ -432,13 +452,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
node = self.nodes[0]
self.logger.info("Creating topic %s with settings %s",
topic_cfg["topic"], topic_cfg)
- kafka_topic_script = self.path.script("kafka-topics.sh", node)
- cmd = kafka_topic_script + " "
- cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s " % {
- 'zk_connect': self.zk_connect_setting(),
- 'topic': topic_cfg.get("topic"),
- }
+ use_zk_connection = topic_cfg.get('if-not-exists', False) or use_zk_to_create_topic
+
+ cmd = "%(kafka_topics_cmd)s %(connection_string)s --create --topic %(topic)s " % {
+ 'kafka_topics_cmd': self._kafka_topics_cmd(node, use_zk_connection),
+ 'connection_string': self._connect_setting(node, use_zk_connection),
+ 'topic': topic_cfg.get("topic"),
+ }
if 'replica-assignment' in topic_cfg:
cmd += " --replica-assignment %(replica-assignment)s" % {
'replica-assignment': topic_cfg.get('replica-assignment')
@@ -456,6 +477,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
for config_name, config_value in topic_cfg["configs"].items():
cmd += " --config %s=%s" % (config_name, str(config_value))
+ cmd += self._kafka_topics_cmd_config(node, use_zk_connection)
+
self.logger.info("Running topic creation command...\n%s" % cmd)
node.account.ssh(cmd)
@@ -479,21 +502,26 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.info("Running topic delete command...\n%s" % cmd)
node.account.ssh(cmd)
- def describe_topic(self, topic, node=None):
+ def describe_topic(self, topic, node=None, use_zk_to_describe_topic=True):
if node is None:
node = self.nodes[0]
- cmd = "%s --zookeeper %s --topic %s --describe" % \
- (self.path.script("kafka-topics.sh", node), self.zk_connect_setting(), topic)
+ cmd = "%s %s --topic %s --describe %s" % \
+ (self._kafka_topics_cmd(node=node, use_zk_connection=use_zk_to_describe_topic),
+ self._connect_setting(node=node, use_zk_connection=use_zk_to_describe_topic),
+ topic, self._kafka_topics_cmd_config(node=node, use_zk_connection=use_zk_to_describe_topic))
+
+ self.logger.info("Running topic describe command...\n%s" % cmd)
output = ""
for line in node.account.ssh_capture(cmd):
output += line
return output
- def list_topics(self, topic=None, node=None):
+ def list_topics(self, node=None, use_zk_to_list_topic=True):
if node is None:
node = self.nodes[0]
- cmd = "%s --zookeeper %s --list" % \
- (self.path.script("kafka-topics.sh", node), self.zk_connect_setting())
+ cmd = "%s %s --list %s" % (self._kafka_topics_cmd(node, use_zk_to_list_topic),
+ self._connect_setting(node, use_zk_to_list_topic),
+ self._kafka_topics_cmd_config(node, use_zk_to_list_topic))
for line in node.account.ssh_capture(cmd):
if not line.startswith("SLF4J"):
yield line.rstrip()
@@ -502,8 +530,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
if node is None:
node = self.nodes[0]
self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version)
- cmd = "%s --zookeeper %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
- (self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), topic, msg_format_version)
+ cmd = "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
+ (self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), self.zk.zkTlsConfigFileOption(), topic, msg_format_version)
self.logger.info("Running alter message format command...\n%s" % cmd)
node.account.ssh(cmd)
@@ -514,8 +542,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.info("Enabling unclean leader election for topic %s", topic)
else:
self.logger.info("Disabling unclean leader election for topic %s", topic)
- cmd = "%s --zookeeper %s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s" % \
- (self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), topic, str(value).lower())
+ cmd = "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s" % \
+ (self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), self.zk.zkTlsConfigFileOption(), topic, str(value).lower())
self.logger.info("Running alter unclean leader command...\n%s" % cmd)
node.account.ssh(cmd)
@@ -729,12 +757,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
raise
def check_protocol_errors(self, node):
- """ Checks for common protocol exceptions due to invalid inter broker protocol handling.
- While such errors can and should be checked in other ways, checking the logs is a worthwhile failsafe.
- """
+ """ Checks for common protocol exceptions due to invalid inter broker protocol handling.
+ While such errors can and should be checked in other ways, checking the logs is a worthwhile failsafe.
+ """
for node in self.nodes:
exit_code = node.account.ssh("grep -e 'java.lang.IllegalArgumentException: Invalid version' -e SchemaException %s/*"
- % KafkaService.OPERATIONAL_LOG_DEBUG_DIR, allow_fail=True)
+ % KafkaService.OPERATIONAL_LOG_DEBUG_DIR, allow_fail=True)
if exit_code != 1:
return False
return True
@@ -791,6 +819,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def zk_connect_setting(self):
return self.zk.connect_setting(self.zk_chroot, self.zk_client_secure)
+ def _connect_setting(self, node, use_zk_connection=True):
+ """
+ Checks if --bootstrap-server config is supported, if yes then returns a string with
+ bootstrap server, otherwise returns zookeeper connection string.
+ """
+ if node.version.topic_command_supports_bootstrap_server() and not use_zk_connection:
+ connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.security_protocol))
+ else:
+ connection_setting = "--zookeeper %s" % (self.zk_connect_setting())
+
+ return connection_setting
+
def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
if validate and not port.open:
raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " %
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 766b7bd..9795eac 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -58,8 +58,10 @@ ssl.endpoint.identification.algorithm=HTTPS
# Note that zookeeper.ssl.client.enable will be set to true or false elsewhere, as appropriate.
# If it is false then these ZK keystore/truststore settings will have no effect. If it is true then
# zookeeper.clientCnxnSocket will also be set elsewhere (to org.apache.zookeeper.ClientCnxnSocketNetty)
+{% if not zk.zk_tls_encrypt_only %}
zookeeper.ssl.keystore.location=/mnt/security/test.keystore.jks
zookeeper.ssl.keystore.password=test-ks-passwd
+{% endif %}
zookeeper.ssl.truststore.location=/mnt/security/test.truststore.jks
zookeeper.ssl.truststore.password=test-ts-passwd
#
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index 42d75cd..f70d7d2 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -76,6 +76,14 @@ class SslStores(object):
self.runcmd("keytool -importcert -keystore %s -storepass %s -storetype JKS -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path))
node.account.copy_to(ks_path, SecurityConfig.KEYSTORE_PATH)
+ # generate ZooKeeper client TLS config file for encryption-only (no client cert) use case
+ str = """zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
+zookeeper.ssl.client.enable=true
+zookeeper.ssl.truststore.location=%s
+zookeeper.ssl.truststore.password=%s
+""" % (SecurityConfig.TRUSTSTORE_PATH, self.truststore_passwd)
+ node.account.create_file(SecurityConfig.ZK_CLIENT_TLS_ENCRYPT_ONLY_CONFIG_PATH, str)
+
# also generate ZooKeeper client TLS config file for mutual authentication use case
str = """zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.client.enable=true
@@ -120,6 +128,7 @@ class SecurityConfig(TemplateRenderer):
CONFIG_DIR = "/mnt/security"
KEYSTORE_PATH = "/mnt/security/test.keystore.jks"
TRUSTSTORE_PATH = "/mnt/security/test.truststore.jks"
+ ZK_CLIENT_TLS_ENCRYPT_ONLY_CONFIG_PATH = "/mnt/security/zk_client_tls_encrypt_only_config.properties"
ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH = "/mnt/security/zk_client_mutual_auth_config.properties"
JAAS_CONF_PATH = "/mnt/security/jaas.conf"
KRB5CONF_PATH = "/mnt/security/krb5.conf"
diff --git a/tests/kafkatest/services/templates/zookeeper.properties b/tests/kafkatest/services/templates/zookeeper.properties
index ef73133..0dfd80d 100644
--- a/tests/kafkatest/services/templates/zookeeper.properties
+++ b/tests/kafkatest/services/templates/zookeeper.properties
@@ -27,6 +27,9 @@ ssl.keyStore.type=JKS
ssl.trustStore.location=/mnt/security/test.truststore.jks
ssl.trustStore.password=test-ts-passwd
ssl.trustStore.type=JKS
+{% if zk_tls_encrypt_only %}
+ssl.clientAuth=none
+{% endif %}
{% endif %}
maxClientCnxns=0
initLimit=5
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index 5b173cc..1131a34 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -44,7 +44,8 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
"collect_default": True}
}
- def __init__(self, context, num_nodes, zk_sasl = False, zk_client_port = True, zk_client_secure_port = False):
+ def __init__(self, context, num_nodes, zk_sasl = False, zk_client_port = True, zk_client_secure_port = False,
+ zk_tls_encrypt_only = False):
"""
:type context
"""
@@ -54,6 +55,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
raise Exception("Cannot disable both ZK clientPort and clientSecurePort")
self.zk_client_port = zk_client_port
self.zk_client_secure_port = zk_client_secure_port
+ self.zk_tls_encrypt_only = zk_tls_encrypt_only
super(ZookeeperService, self).__init__(context, num_nodes)
@property
@@ -147,6 +149,12 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
return ','.join([node.account.hostname + (':2182' if not self.zk_client_port or force_tls else ':2181') + chroot
for node in self.nodes])
+ def zkTlsConfigFileOption(self, forZooKeeperMain=False):
+ if not self.zk_client_secure_port:
+ return ""
+ return ("-zk-tls-config-file " if forZooKeeperMain else "--zk-tls-config-file ") + \
+ (SecurityConfig.ZK_CLIENT_TLS_ENCRYPT_ONLY_CONFIG_PATH if self.zk_tls_encrypt_only else SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH)
+
#
# This call is used to simulate a rolling upgrade to enable/disable
# the use of ZooKeeper ACLs.
@@ -157,8 +165,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
la_migra_cmd += "%s --zookeeper.acl=%s --zookeeper.connect=%s %s" % \
(self.path.script("zookeeper-security-migration.sh", node), zk_acl,
self.connect_setting(force_tls=self.zk_client_secure_port),
- "--zk-tls-config-file=" + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH \
- if self.zk_client_secure_port else "")
+ self.zkTlsConfigFileOption())
node.account.ssh(la_migra_cmd)
def _check_chroot(self, chroot):
@@ -176,7 +183,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
cmd = "%s %s -server %s %s get %s" % \
(kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port),
- "-zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
+ self.zkTlsConfigFileOption(True),
chroot_path)
self.logger.debug(cmd)
@@ -201,7 +208,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
cmd = "%s %s -server %s %s create %s '%s'" % \
(kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port),
- "-zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
+ self.zkTlsConfigFileOption(True),
chroot_path, value)
self.logger.debug(cmd)
output = self.nodes[0].account.ssh_output(cmd)
@@ -215,7 +222,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
cmd = "%s kafka.admin.ConfigCommand --zookeeper %s %s --describe --topic %s" % \
(kafka_run_class, self.connect_setting(force_tls=self.zk_client_secure_port),
- "--zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
+ self.zkTlsConfigFileOption(),
topic)
self.logger.debug(cmd)
output = self.nodes[0].account.ssh_output(cmd)
@@ -229,7 +236,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
cmd = "%s kafka.admin.AclCommand --authorizer-properties zookeeper.connect=%s %s --list --topic %s" % \
(kafka_run_class, self.connect_setting(force_tls=self.zk_client_secure_port),
- "--zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
+ self.zkTlsConfigFileOption(),
topic)
self.logger.debug(cmd)
output = self.nodes[0].account.ssh_output(cmd)
diff --git a/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py b/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py
new file mode 100644
index 0000000..4f1cf44
--- /dev/null
+++ b/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix, ignore
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+
+import logging
+
+class ZookeeperTlsEncryptOnlyTest(ProduceConsumeValidateTest):
+ """Tests TLS encryption-only (ssl.clientAuth=none) connectivity to zookeeper.
+ """
+
+ def __init__(self, test_context):
+ super(ZookeeperTlsEncryptOnlyTest, self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.group = "group"
+ self.producer_throughput = 100
+ self.num_producers = 1
+ self.num_consumers = 1
+
+ self.zk = ZookeeperService(self.test_context, num_nodes=3,
+ zk_client_port = False, zk_client_secure_port = True, zk_tls_encrypt_only = True)
+
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, zk_client_secure=True, topics={self.topic: {
+ "partitions": 3,
+ "replication-factor": 3,
+ 'configs': {"min.insync.replicas": 2}}})
+
+ def create_producer_and_consumer(self):
+ self.producer = VerifiableProducer(
+ self.test_context, self.num_producers, self.kafka, self.topic,
+ throughput=self.producer_throughput)
+
+ self.consumer = ConsoleConsumer(
+ self.test_context, self.num_consumers, self.kafka, self.topic,
+ consumer_timeout_ms=60000, message_validator=is_int)
+
+ self.consumer.group_id = self.group
+
+ def perform_produce_consume_validation(self):
+ self.create_producer_and_consumer()
+ self.run_produce_consume_validate()
+ self.producer.free()
+ self.consumer.free()
+
+ @cluster(num_nodes=9)
+ def test_zk_tls_encrypt_only(self):
+ self.zk.start()
+ self.kafka.security_protocol = self.kafka.interbroker_security_protocol = "PLAINTEXT"
+
+ # Cannot use --zookeeper because kafka-topics.sh is unable to connect to a TLS-enabled ZooKeeper quorum,
+ # so indicate that topics should be created via the Admin client
+ self.kafka.start(use_zk_to_create_topic=False)
+
+ self.perform_produce_consume_validation()
+
+ # Make sure the ZooKeeper command line is able to talk to a TLS-enabled, encrypt-only ZooKeeper quorum
+ # Test both create() and query(), each of which leverages the ZooKeeper command line
+ # This tests the code in org.apache.zookeeper.ZooKeeperMainWithTlsSupportForKafka
+ path="/foo"
+ value="{\"bar\": 0}"
+ self.zk.create(path, value=value)
+ if self.zk.query(path) != value:
+ raise Exception("Error creating and then querying a znode using the CLI with a TLS-enabled ZooKeeper quorum")
+
+ # Make sure the ConfigCommand CLI is able to talk to a TLS-enabled, encrypt-only ZooKeeper quorum
+ # This is necessary for the bootstrap use case despite direct ZooKeeper connectivity being deprecated
+ self.zk.describe(self.topic)
+
+ # Make sure the AclCommand CLI is able to talk to a TLS-enabled, encrypt-only ZooKeeper quorum
+ # This is necessary for the bootstrap use case despite direct ZooKeeper connectivity being deprecated
+ self.zk.list_acls(self.topic)
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index c90c853..26537e3 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -52,6 +52,8 @@ class KafkaVersion(LooseVersion):
def supports_named_listeners(self):
return self >= V_0_10_2_0
+ def topic_command_supports_bootstrap_server(self):
+ return self >= V_2_3_0
def get_version(node=None):
"""Return the version attached to the given node.