You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2020/02/25 14:31:45 UTC

[kafka] branch 2.5 updated: KAFKA-9567: Docs, system tests for ZooKeeper 3.5.7

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new edf8809  KAFKA-9567: Docs, system tests for ZooKeeper 3.5.7
edf8809 is described below

commit edf88095612e3fc21fbd58e36be1f7c8b9241599
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Tue Feb 25 19:59:55 2020 +0530

    KAFKA-9567: Docs, system tests for ZooKeeper 3.5.7
    
    These changes depend on [KIP-515: Enable ZK client to use the new TLS supported authentication](https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication), which was only added to 2.5.0. The upgrade to ZooKeeper 3.5.7 was merged to both 2.5.0 and 2.4.1 via https://issues.apache.org/jira/browse/KAFKA-9515, but this change must only be merged to 2.5.0 (it will break the system tests if merged to 2.4.1).
    
    Author: Ron Dagostino <rd...@confluent.io>
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Andrew Choi <li...@microsoft.com>
    
    Closes #8132 from rondagostino/KAFKA-9567
    
    (cherry picked from commit 9d53ad794de45739093070306f44df2a2f31e9e4)
    Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
 docs/security.html                                 | 43 ++++++----
 docs/toc.html                                      |  1 +
 tests/kafkatest/services/kafka/kafka.py            | 92 +++++++++++++++------
 .../services/kafka/templates/kafka.properties      |  2 +
 .../kafkatest/services/security/security_config.py |  9 +++
 .../services/templates/zookeeper.properties        |  3 +
 tests/kafkatest/services/zookeeper.py              | 21 +++--
 .../tests/core/zookeeper_tls_encrypt_only_test.py  | 93 ++++++++++++++++++++++
 tests/kafkatest/version.py                         |  2 +
 9 files changed, 219 insertions(+), 47 deletions(-)

diff --git a/docs/security.html b/docs/security.html
index f9fca78..4f515e0 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1890,14 +1890,6 @@
         It is only when  using mTLS authentication alone that all the DNs must match (and SANs become critical --
         again, in the absence of writing and deploying a custom ZooKeeper authentication provider as described below).
         </p>
-    Note that ZooKeeper version 3.5.6 <strong>requires</strong> clients that connect via TLS to present their own certificate
-    (i.e. encryption and mTLS authentication are linked rather than independent of each other in this ZooKeeper version).
-    There is a ZooKeeper sever-side config <tt>ssl.clientAuth</tt> that is recognized
-    (case-insensitively: <tt>want</tt>/<tt>need</tt>/<tt>none</tt> are the valid options),
-    but this config <a href="https://issues.apache.org/jira/browse/ZOOKEEPER-3674">is not acted upon in version 3.5.6</a>.
-    A future version of ZooKeeper will both recognize and act upon this config and allow clients to connect via a TLS-encrypted connection
-    without presenting their own certificate -- i.e. encryption and authentication will become independent of each other.
-    But for now this is not the case.
     </p>
     <p>
         Use the broker properties file to set TLS configs for brokers as described below.
@@ -1935,7 +1927,7 @@
     </p>
     Here is a sample (partial) ZooKeeper configuration for enabling TLS authentication.
     These configurations are described in the
-    <a href="https://zookeeper.apache.org/doc/r3.5.6/zookeeperAdmin.html#sc_authOptions">ZooKeeper Admin Guide</a>.
+    <a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#sc_authOptions">ZooKeeper Admin Guide</a>.
     <pre>
         secureClientPort=2182
         serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
@@ -1949,8 +1941,9 @@
     to a value different from the keystore password itself.
     Be sure to set the key password to be the same as the keystore password.
 
-    Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication.
-    These configuration are described above in <a href="#brokerconfigs">Broker Configs</a>.
+    <p>Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication.
+    These configurations are described above in <a href="#brokerconfigs">Broker Configs</a>.
+    </p>
     <pre>
         # connect to the ZooKeeper port configured for TLS
         zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
@@ -2009,12 +2002,34 @@
     <h4><a id="zk_authz_ensemble" href="#zk_authz_ensemble">7.6.3 Migrating the ZooKeeper ensemble</a></h4>
     It is also necessary to enable SASL and/or mTLS authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. See above for mTLS information.  Please refer to the ZooKeeper documentation for more detail:
     <ol>
-        <li><a href="http://zookeeper.apache.org/doc/r3.5.6/zookeeperProgrammers.html#sc_ZooKeeperAccessControl">Apache ZooKeeper documentation</a></li>
+        <li><a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperProgrammers.html#sc_ZooKeeperAccessControl">Apache ZooKeeper documentation</a></li>
         <li><a href="https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zookeeper+and+SASL">Apache ZooKeeper wiki</a></li>
     </ol>
-    <h4><a id="zk_authz_quorum" href="#zk_authz_quorum">7.6.4 ZooKeeper Quorum Mutial TLS Authentication</a></h4>
+    <h4><a id="zk_authz_quorum" href="#zk_authz_quorum">7.6.4 ZooKeeper Quorum Mutual TLS Authentication</a></h4>
     It is possible to enable mTLS authentication between the ZooKeeper servers themselves.
-    Please refer to the <a href="https://zookeeper.apache.org/doc/r3.5.6/zookeeperAdmin.html#Quorum+TLS">ZooKeeper documentation</a> for more detail.
+    Please refer to the <a href="https://zookeeper.apache.org/doc/r3.5.7/zookeeperAdmin.html#Quorum+TLS">ZooKeeper documentation</a> for more detail.
+
+    <h3><a id="zk_encryption" href="#zk_encryption">7.7 ZooKeeper Encryption</a></h3>
+    ZooKeeper connections that use mutual TLS are encrypted.
+    Beginning with ZooKeeper version 3.5.7 (the version shipped with Kafka version 2.5) ZooKeeper supports a sever-side config
+    <tt>ssl.clientAuth</tt> (case-insensitively: <tt>want</tt>/<tt>need</tt>/<tt>none</tt> are the valid options, the default is <tt>need</tt>),
+    and setting this value to <tt>none</tt> in ZooKeeper allows clients to connect via a TLS-encrypted connection
+    without presenting their own certificate.  Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with just TLS encryption.
+    These configurations are described above in <a href="#brokerconfigs">Broker Configs</a>.
+    <pre>
+        # connect to the ZooKeeper port configured for TLS
+        zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
+        # required to use TLS to ZooKeeper (default is false)
+        zookeeper.ssl.client.enable=true
+        # required to use TLS to ZooKeeper
+        zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
+        # define trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
+        # no need to set keystore information assuming ssl.clientAuth=none on ZooKeeper
+        zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
+        zookeeper.ssl.truststore.password=kafka-ts-passwd
+        # tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
+        zookeeper.set.acl=true
+    </pre>
 </script>
 
 <div class="p-security"></div>
diff --git a/docs/toc.html b/docs/toc.html
index 11c4424..b8c977a 100644
--- a/docs/toc.html
+++ b/docs/toc.html
@@ -135,6 +135,7 @@
                     <li><a href="#zk_authz_ensemble">Migrating the ZooKeeper Ensemble</a></li>
                     <li><a href="#zk_authz_quorum">ZooKeeper Quorum Mutual TLS Authentication</a></li>
                 </ul>
+                <li><a href="#zk_encryption">7.7 ZooKeeper Encryption</a></li>
             </ul>
         </li>
         <li><a href="#connect">8. Kafka Connect</a>
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e59431e..1c3d161 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -70,6 +70,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
     INTERBROKER_LISTENER_NAME = 'INTERNAL'
+    JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
+    KRB5_CONF = "java.security.krb5.conf=/mnt/security/krb5.conf"
 
     logs = {
         "kafka_server_start_stdout_stderr": {
@@ -235,8 +237,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def alive(self, node):
         return len(self.pids(node)) > 0
 
-    def start(self, add_principals=""):
-        if self.zk_client_secure and not self.zk.client_secure_port:
+    def start(self, add_principals="", use_zk_to_create_topic=True):
+        if self.zk_client_secure and not self.zk.zk_client_secure_port:
             raise Exception("Unable to start Kafka: TLS to Zookeeper requested but Zookeeper secure port not enabled")
         self.open_port(self.security_protocol)
         self.interbroker_listener.open = True
@@ -262,7 +264,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                     topic_cfg = {}
 
                 topic_cfg["topic"] = topic
-                self.create_topic(topic_cfg)
+                self.create_topic(topic_cfg, use_zk_to_create_topic=use_zk_to_create_topic)
 
     def _ensure_zk_chroot(self):
         self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
@@ -295,8 +297,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
 
         #load template configs as dictionary
         config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node),
-                                 security_config=self.security_config, num_nodes=self.num_nodes,
-                                 listener_security_config=self.listener_security_config)
+                                      security_config=self.security_config, num_nodes=self.num_nodes,
+                                      listener_security_config=self.listener_security_config)
 
         configs = dict( l.rstrip().split('=', 1) for l in config_template.split('\n')
                         if not l.startswith("#") and "=" in l )
@@ -421,7 +423,25 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                                          clean_shutdown=False, allow_fail=True)
         node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False)
 
-    def create_topic(self, topic_cfg, node=None):
+    def _kafka_topics_cmd(self, node, use_zk_connection=True):
+        """
+        Returns kafka-topics.sh command path with jaas configuration and krb5 environment variable
+        set. If Admin client is not going to be used, don't set the environment variable.
+        """
+        kafka_topic_script = self.path.script("kafka-topics.sh", node)
+        skip_security_settings = use_zk_connection or not node.version.topic_command_supports_bootstrap_server()
+        return kafka_topic_script if skip_security_settings else \
+            "KAFKA_OPTS='-D%s -D%s' %s" % (KafkaService.JAAS_CONF_PROPERTY, KafkaService.KRB5_CONF, kafka_topic_script)
+
+    def _kafka_topics_cmd_config(self, node, use_zk_connection=True):
+        """
+        Return --command-config parameter to the kafka-topics.sh command. The config parameter specifies
+        the security settings that AdminClient uses to connect to a secure kafka server.
+        """
+        skip_command_config = use_zk_connection or not node.version.topic_command_supports_bootstrap_server()
+        return "" if skip_command_config else " --command-config <(echo '%s')" % (self.security_config.client_config())
+
+    def create_topic(self, topic_cfg, node=None, use_zk_to_create_topic=True):
         """Run the admin tool create topic command.
         Specifying node is optional, and may be done if for different kafka nodes have different versions,
         and we care where command gets run.
@@ -432,13 +452,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             node = self.nodes[0]
         self.logger.info("Creating topic %s with settings %s",
                          topic_cfg["topic"], topic_cfg)
-        kafka_topic_script = self.path.script("kafka-topics.sh", node)
 
-        cmd = kafka_topic_script + " "
-        cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s " % {
-                'zk_connect': self.zk_connect_setting(),
-                'topic': topic_cfg.get("topic"),
-           }
+        use_zk_connection = topic_cfg.get('if-not-exists', False) or use_zk_to_create_topic
+
+        cmd = "%(kafka_topics_cmd)s %(connection_string)s --create --topic %(topic)s " % {
+            'kafka_topics_cmd': self._kafka_topics_cmd(node, use_zk_connection),
+            'connection_string': self._connect_setting(node, use_zk_connection),
+            'topic': topic_cfg.get("topic"),
+        }
         if 'replica-assignment' in topic_cfg:
             cmd += " --replica-assignment %(replica-assignment)s" % {
                 'replica-assignment': topic_cfg.get('replica-assignment')
@@ -456,6 +477,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             for config_name, config_value in topic_cfg["configs"].items():
                 cmd += " --config %s=%s" % (config_name, str(config_value))
 
+        cmd += self._kafka_topics_cmd_config(node, use_zk_connection)
+
         self.logger.info("Running topic creation command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -479,21 +502,26 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.logger.info("Running topic delete command...\n%s" % cmd)
         node.account.ssh(cmd)
 
-    def describe_topic(self, topic, node=None):
+    def describe_topic(self, topic, node=None, use_zk_to_describe_topic=True):
         if node is None:
             node = self.nodes[0]
-        cmd = "%s --zookeeper %s --topic %s --describe" % \
-              (self.path.script("kafka-topics.sh", node), self.zk_connect_setting(), topic)
+        cmd = "%s %s --topic %s --describe %s" % \
+              (self._kafka_topics_cmd(node=node, use_zk_connection=use_zk_to_describe_topic),
+               self._connect_setting(node=node, use_zk_connection=use_zk_to_describe_topic),
+               topic, self._kafka_topics_cmd_config(node=node, use_zk_connection=use_zk_to_describe_topic))
+
+        self.logger.info("Running topic describe command...\n%s" % cmd)
         output = ""
         for line in node.account.ssh_capture(cmd):
             output += line
         return output
 
-    def list_topics(self, topic=None, node=None):
+    def list_topics(self, node=None, use_zk_to_list_topic=True):
         if node is None:
             node = self.nodes[0]
-        cmd = "%s --zookeeper %s --list" % \
-              (self.path.script("kafka-topics.sh", node), self.zk_connect_setting())
+        cmd = "%s %s --list %s" % (self._kafka_topics_cmd(node, use_zk_to_list_topic),
+                                   self._connect_setting(node, use_zk_to_list_topic),
+                                   self._kafka_topics_cmd_config(node, use_zk_to_list_topic))
         for line in node.account.ssh_capture(cmd):
             if not line.startswith("SLF4J"):
                 yield line.rstrip()
@@ -502,8 +530,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         if node is None:
             node = self.nodes[0]
         self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version)
-        cmd = "%s --zookeeper %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
-              (self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), topic, msg_format_version)
+        cmd = "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
+              (self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), self.zk.zkTlsConfigFileOption(), topic, msg_format_version)
         self.logger.info("Running alter message format command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -514,8 +542,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             self.logger.info("Enabling unclean leader election for topic %s", topic)
         else:
             self.logger.info("Disabling unclean leader election for topic %s", topic)
-        cmd = "%s --zookeeper %s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s" % \
-              (self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), topic, str(value).lower())
+        cmd = "%s --zookeeper %s %s --entity-name %s --entity-type topics --alter --add-config unclean.leader.election.enable=%s" % \
+              (self.path.script("kafka-configs.sh", node), self.zk_connect_setting(), self.zk.zkTlsConfigFileOption(), topic, str(value).lower())
         self.logger.info("Running alter unclean leader command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -729,12 +757,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
             raise
 
     def check_protocol_errors(self, node):
-	""" Checks for common protocol exceptions due to invalid inter broker protocol handling.
-        While such errors can and should be checked in other ways, checking the logs is a worthwhile failsafe.
-        """
+        """ Checks for common protocol exceptions due to invalid inter broker protocol handling.
+            While such errors can and should be checked in other ways, checking the logs is a worthwhile failsafe.
+            """
         for node in self.nodes:
             exit_code = node.account.ssh("grep -e 'java.lang.IllegalArgumentException: Invalid version' -e SchemaException %s/*"
-                    % KafkaService.OPERATIONAL_LOG_DEBUG_DIR, allow_fail=True)
+                                         % KafkaService.OPERATIONAL_LOG_DEBUG_DIR, allow_fail=True)
             if exit_code != 1:
                 return False
         return True
@@ -791,6 +819,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def zk_connect_setting(self):
         return self.zk.connect_setting(self.zk_chroot, self.zk_client_secure)
 
+    def _connect_setting(self, node, use_zk_connection=True):
+        """
+        Checks if --bootstrap-server config is supported, if yes then returns a string with
+        bootstrap server, otherwise returns zookeeper connection string.
+        """
+        if node.version.topic_command_supports_bootstrap_server() and not use_zk_connection:
+            connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.security_protocol))
+        else:
+            connection_setting = "--zookeeper %s" % (self.zk_connect_setting())
+
+        return connection_setting
+
     def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
         if validate and not port.open:
             raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " %
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 766b7bd..9795eac 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -58,8 +58,10 @@ ssl.endpoint.identification.algorithm=HTTPS
 # Note that zookeeper.ssl.client.enable will be set to true or false elsewhere, as appropriate.
 # If it is false then these ZK keystore/truststore settings will have no effect.  If it is true then
 # zookeeper.clientCnxnSocket will also be set elsewhere (to org.apache.zookeeper.ClientCnxnSocketNetty)
+{% if not zk.zk_tls_encrypt_only %}
 zookeeper.ssl.keystore.location=/mnt/security/test.keystore.jks
 zookeeper.ssl.keystore.password=test-ks-passwd
+{% endif %}
 zookeeper.ssl.truststore.location=/mnt/security/test.truststore.jks
 zookeeper.ssl.truststore.password=test-ts-passwd
 #
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index 42d75cd..f70d7d2 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -76,6 +76,14 @@ class SslStores(object):
         self.runcmd("keytool -importcert -keystore %s -storepass %s -storetype JKS -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path))
         node.account.copy_to(ks_path, SecurityConfig.KEYSTORE_PATH)
 
+        # generate ZooKeeper client TLS config file for encryption-only (no client cert) use case
+        str = """zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
+zookeeper.ssl.client.enable=true
+zookeeper.ssl.truststore.location=%s
+zookeeper.ssl.truststore.password=%s
+""" % (SecurityConfig.TRUSTSTORE_PATH, self.truststore_passwd)
+        node.account.create_file(SecurityConfig.ZK_CLIENT_TLS_ENCRYPT_ONLY_CONFIG_PATH, str)
+
         # also generate ZooKeeper client TLS config file for mutual authentication use case
         str = """zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
 zookeeper.ssl.client.enable=true
@@ -120,6 +128,7 @@ class SecurityConfig(TemplateRenderer):
     CONFIG_DIR = "/mnt/security"
     KEYSTORE_PATH = "/mnt/security/test.keystore.jks"
     TRUSTSTORE_PATH = "/mnt/security/test.truststore.jks"
+    ZK_CLIENT_TLS_ENCRYPT_ONLY_CONFIG_PATH = "/mnt/security/zk_client_tls_encrypt_only_config.properties"
     ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH = "/mnt/security/zk_client_mutual_auth_config.properties"
     JAAS_CONF_PATH = "/mnt/security/jaas.conf"
     KRB5CONF_PATH = "/mnt/security/krb5.conf"
diff --git a/tests/kafkatest/services/templates/zookeeper.properties b/tests/kafkatest/services/templates/zookeeper.properties
index ef73133..0dfd80d 100644
--- a/tests/kafkatest/services/templates/zookeeper.properties
+++ b/tests/kafkatest/services/templates/zookeeper.properties
@@ -27,6 +27,9 @@ ssl.keyStore.type=JKS
 ssl.trustStore.location=/mnt/security/test.truststore.jks
 ssl.trustStore.password=test-ts-passwd
 ssl.trustStore.type=JKS
+{% if zk_tls_encrypt_only %}
+ssl.clientAuth=none
+{% endif %}
 {% endif %}
 maxClientCnxns=0
 initLimit=5
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index 5b173cc..1131a34 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -44,7 +44,8 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
             "collect_default": True}
     }
 
-    def __init__(self, context, num_nodes, zk_sasl = False, zk_client_port = True, zk_client_secure_port = False):
+    def __init__(self, context, num_nodes, zk_sasl = False, zk_client_port = True, zk_client_secure_port = False,
+                 zk_tls_encrypt_only = False):
         """
         :type context
         """
@@ -54,6 +55,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
             raise Exception("Cannot disable both ZK clientPort and clientSecurePort")
         self.zk_client_port = zk_client_port
         self.zk_client_secure_port = zk_client_secure_port
+        self.zk_tls_encrypt_only = zk_tls_encrypt_only
         super(ZookeeperService, self).__init__(context, num_nodes)
 
     @property
@@ -147,6 +149,12 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         return ','.join([node.account.hostname + (':2182' if not self.zk_client_port or force_tls else ':2181') + chroot
                          for node in self.nodes])
 
+    def zkTlsConfigFileOption(self, forZooKeeperMain=False):
+        if not self.zk_client_secure_port:
+            return ""
+        return ("-zk-tls-config-file " if forZooKeeperMain else "--zk-tls-config-file ") + \
+               (SecurityConfig.ZK_CLIENT_TLS_ENCRYPT_ONLY_CONFIG_PATH if self.zk_tls_encrypt_only else SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH)
+
     #
     # This call is used to simulate a rolling upgrade to enable/disable
     # the use of ZooKeeper ACLs.
@@ -157,8 +165,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         la_migra_cmd += "%s --zookeeper.acl=%s --zookeeper.connect=%s %s" % \
                        (self.path.script("zookeeper-security-migration.sh", node), zk_acl,
                         self.connect_setting(force_tls=self.zk_client_secure_port),
-                        "--zk-tls-config-file=" + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH \
-                            if self.zk_client_secure_port else "")
+                        self.zkTlsConfigFileOption())
         node.account.ssh(la_migra_cmd)
 
     def _check_chroot(self, chroot):
@@ -176,7 +183,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
         cmd = "%s %s -server %s %s get %s" % \
               (kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port),
-               "-zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
+               self.zkTlsConfigFileOption(True),
                chroot_path)
         self.logger.debug(cmd)
 
@@ -201,7 +208,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
         cmd = "%s %s -server %s %s create %s '%s'" % \
               (kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port),
-               "-zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
+               self.zkTlsConfigFileOption(True),
                chroot_path, value)
         self.logger.debug(cmd)
         output = self.nodes[0].account.ssh_output(cmd)
@@ -215,7 +222,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
         cmd = "%s kafka.admin.ConfigCommand --zookeeper %s %s --describe --topic %s" % \
               (kafka_run_class, self.connect_setting(force_tls=self.zk_client_secure_port),
-               "--zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
+               self.zkTlsConfigFileOption(),
                topic)
         self.logger.debug(cmd)
         output = self.nodes[0].account.ssh_output(cmd)
@@ -229,7 +236,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
         cmd = "%s kafka.admin.AclCommand --authorizer-properties zookeeper.connect=%s %s --list --topic %s" % \
               (kafka_run_class, self.connect_setting(force_tls=self.zk_client_secure_port),
-               "--zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
+               self.zkTlsConfigFileOption(),
                topic)
         self.logger.debug(cmd)
         output = self.nodes[0].account.ssh_output(cmd)
diff --git a/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py b/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py
new file mode 100644
index 0000000..4f1cf44
--- /dev/null
+++ b/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix, ignore
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+
+import logging
+
+class ZookeeperTlsEncryptOnlyTest(ProduceConsumeValidateTest):
+    """Tests TLS encryption-only (ssl.clientAuth=none) connectivity to zookeeper.
+    """
+
+    def __init__(self, test_context):
+        super(ZookeeperTlsEncryptOnlyTest, self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.group = "group"
+        self.producer_throughput = 100
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        self.zk = ZookeeperService(self.test_context, num_nodes=3,
+                                   zk_client_port = False, zk_client_secure_port = True, zk_tls_encrypt_only = True)
+
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, zk_client_secure=True, topics={self.topic: {
+            "partitions": 3,
+            "replication-factor": 3,
+            'configs': {"min.insync.replicas": 2}}})
+
+    def create_producer_and_consumer(self):
+        self.producer = VerifiableProducer(
+            self.test_context, self.num_producers, self.kafka, self.topic,
+            throughput=self.producer_throughput)
+
+        self.consumer = ConsoleConsumer(
+            self.test_context, self.num_consumers, self.kafka, self.topic,
+            consumer_timeout_ms=60000, message_validator=is_int)
+
+        self.consumer.group_id = self.group
+
+    def perform_produce_consume_validation(self):
+        self.create_producer_and_consumer()
+        self.run_produce_consume_validate()
+        self.producer.free()
+        self.consumer.free()
+
+    @cluster(num_nodes=9)
+    def test_zk_tls_encrypt_only(self):
+        self.zk.start()
+        self.kafka.security_protocol = self.kafka.interbroker_security_protocol = "PLAINTEXT"
+
+        # Cannot use --zookeeper because kafka-topics.sh is unable to connect to a TLS-enabled ZooKeeper quorum,
+        # so indicate that topics should be created via the Admin client
+        self.kafka.start(use_zk_to_create_topic=False)
+
+        self.perform_produce_consume_validation()
+
+        # Make sure the ZooKeeper command line is able to talk to a TLS-enabled, encrypt-only ZooKeeper quorum
+        # Test both create() and query(), each of which leverages the ZooKeeper command line
+        # This tests the code in org.apache.zookeeper.ZooKeeperMainWithTlsSupportForKafka
+        path="/foo"
+        value="{\"bar\": 0}"
+        self.zk.create(path, value=value)
+        if self.zk.query(path) != value:
+            raise Exception("Error creating and then querying a znode using the CLI with a TLS-enabled ZooKeeper quorum")
+
+        # Make sure the ConfigCommand CLI is able to talk to a TLS-enabled, encrypt-only ZooKeeper quorum
+        # This is necessary for the bootstrap use case despite direct ZooKeeper connectivity being deprecated
+        self.zk.describe(self.topic)
+
+        # Make sure the AclCommand CLI is able to talk to a TLS-enabled, encrypt-only ZooKeeper quorum
+        # This is necessary for the bootstrap use case despite direct ZooKeeper connectivity being deprecated
+        self.zk.list_acls(self.topic)
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index c90c853..26537e3 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -52,6 +52,8 @@ class KafkaVersion(LooseVersion):
     def supports_named_listeners(self):
         return self >= V_0_10_2_0
 
+    def topic_command_supports_bootstrap_server(self):
+        return self >= V_2_3_0
 
 def get_version(node=None):
     """Return the version attached to the given node.