You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/19 10:47:55 UTC

[kafka] branch 1.1 updated: KAFKA-6676: Ensure Kafka chroot exists in system tests and use chroot on one test with security parameterizations (#4729)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 11e4d09  KAFKA-6676: Ensure Kafka chroot exists in system tests and use chroot on one test with security parameterizations (#4729)
11e4d09 is described below

commit 11e4d09f2356e8f9a8f1333af5e70debedbf1666
Author: Ewen Cheslack-Postava <me...@ewencp.org>
AuthorDate: Mon Mar 19 03:37:02 2018 -0700

    KAFKA-6676: Ensure Kafka chroot exists in system tests and use chroot on one test with security parameterizations (#4729)
    
    Ensures Kafka chroot exists in ZK when starting KafkaService so commands that use ZK and are executed before the first Kafka broker starts do not fail due to the missing chroot.
    
    Also uses chroot with one test that also has security parameterizations so Kafka's test suite exercises these combinations. Previously no tests were exercising chroots.
    
    Changes were validated using sanity_checks which include the chroot-ed test as well as some non-chroot-ed tests.
---
 .../sanity_checks/test_console_consumer.py         |  2 +-
 tests/kafkatest/services/kafka/kafka.py            | 12 +++++++++
 tests/kafkatest/services/zookeeper.py              | 30 +++++++++++++++++-----
 3 files changed, 37 insertions(+), 7 deletions(-)

diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 066d6d4..537755d 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -36,7 +36,7 @@ class ConsoleConsumerTest(Test):
 
         self.topic = "topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka",
                                   topics={self.topic: {"partitions": 1, "replication-factor": 1}})
         self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, new_consumer=False)
 
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 619da9f..22674b9 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -163,6 +163,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.open_port(self.interbroker_security_protocol)
 
         self.start_minikdc(add_principals)
+        self._ensure_zk_chroot()
+
         Service.start(self)
 
         self.logger.info("Waiting for brokers to register at ZK")
@@ -183,6 +185,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                 topic_cfg["topic"] = topic
                 self.create_topic(topic_cfg)
 
+    def _ensure_zk_chroot(self):
+        self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
+        if self.zk_chroot:
+            if not self.zk_chroot.startswith('/'):
+                raise Exception("Zookeeper chroot must start with '/' but found " + self.zk_chroot)
+
+            parts = self.zk_chroot.split('/')[1:]
+            for i in range(len(parts)):
+                self.zk.create('/' + '/'.join(parts[:i+1]))
+
     def set_protocol_and_port(self, node):
         listeners = []
         advertised_listeners = []
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index b181a12..5bda867 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -103,7 +103,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         idx = self.idx(node)
         self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
         node.account.kill_java_processes(self.java_class_name(), allow_fail=False)
-        node.account.kill_java_processes(self.java_query_class_name(), allow_fail=False)
+        node.account.kill_java_processes(self.java_cli_class_name(), allow_fail=False)
         wait_until(lambda: not self.alive(node), timeout_sec=5, err_msg="Timed out waiting for zookeeper to stop.")
 
     def clean_node(self, node):
@@ -113,7 +113,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                              (self.__class__.__name__, node.account))
         node.account.kill_java_processes(self.java_class_name(),
                                          clean_shutdown=False, allow_fail=True)
-        node.account.kill_java_processes(self.java_query_class_name(),
+        node.account.kill_java_processes(self.java_cli_class_name(),
                                          clean_shutdown=False, allow_fail=False)
         node.account.ssh("rm -rf -- %s" % ZookeeperService.ROOT, allow_fail=False)
 
@@ -134,18 +134,21 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                        (self.path.script("zookeeper-security-migration.sh", node), zk_acl, self.connect_setting())
         node.account.ssh(la_migra_cmd)
 
+    def _check_chroot(self, chroot):
+        if chroot and not chroot.startswith("/"):
+            raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
+
     def query(self, path, chroot=None):
         """
         Queries zookeeper for data associated with 'path' and returns all fields in the schema
         """
-        if chroot and not chroot.startswith("/"):
-            raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
+        self._check_chroot(chroot)
 
         chroot_path = ('' if chroot is None else chroot) + path
 
         kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
         cmd = "%s %s -server %s get %s" % \
-              (kafka_run_class, self.java_query_class_name(), self.connect_setting(), chroot_path)
+              (kafka_run_class, self.java_cli_class_name(), self.connect_setting(), chroot_path)
         self.logger.debug(cmd)
 
         node = self.nodes[0]
@@ -158,10 +161,25 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                     result = match.groups()[0]
         return result
 
+    def create(self, path, chroot=None):
+        """
+        Create an znode at the given path
+        """
+        self._check_chroot(chroot)
+
+        chroot_path = ('' if chroot is None else chroot) + path
+
+        kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
+        cmd = "%s %s -server %s create %s ''" % \
+              (kafka_run_class, self.java_cli_class_name(), self.connect_setting(), chroot_path)
+        self.logger.debug(cmd)
+        output = self.nodes[0].account.ssh_output(cmd)
+        self.logger.debug(output)
+
     def java_class_name(self):
         """ The class name of the Zookeeper quorum peers. """
         return "org.apache.zookeeper.server.quorum.QuorumPeerMain"
 
-    def java_query_class_name(self):
+    def java_cli_class_name(self):
         """ The class name of the Zookeeper tool within Kafka. """
         return "kafka.tools.ZooKeeperMainWrapper"

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.