You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/19 10:37:08 UTC

[kafka] branch trunk updated: KAFKA-6676: Ensure Kafka chroot exists in system tests and use chroot on one test with security parameterizations (#4729)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f264bfa  KAFKA-6676: Ensure Kafka chroot exists in system tests and use chroot on one test with security parameterizations (#4729)
f264bfa is described below

commit f264bfa2967f4b431f09d8b99d5f468fca310805
Author: Ewen Cheslack-Postava <me...@ewencp.org>
AuthorDate: Mon Mar 19 03:37:02 2018 -0700

    KAFKA-6676: Ensure Kafka chroot exists in system tests and use chroot on one test with security parameterizations (#4729)
    
    Ensures Kafka chroot exists in ZK when starting KafkaService so commands that use ZK and are executed before the first Kafka broker starts do not fail due to the missing chroot.
    
    Also uses chroot with one test that also has security parameterizations so Kafka's test suite exercises these combinations. Previously no tests were exercising chroots.
    
    Changes were validated using sanity_checks which include the chroot-ed test as well as some non-chroot-ed tests.
---
 .../sanity_checks/test_console_consumer.py         |  2 +-
 tests/kafkatest/services/kafka/kafka.py            | 12 +++++++++
 tests/kafkatest/services/zookeeper.py              | 30 +++++++++++++++++-----
 3 files changed, 37 insertions(+), 7 deletions(-)

diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 066d6d4..537755d 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -36,7 +36,7 @@ class ConsoleConsumerTest(Test):
 
         self.topic = "topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka",
                                   topics={self.topic: {"partitions": 1, "replication-factor": 1}})
         self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, new_consumer=False)
 
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e563ab8..c4d4b24 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -163,6 +163,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.open_port(self.interbroker_security_protocol)
 
         self.start_minikdc(add_principals)
+        self._ensure_zk_chroot()
+
         Service.start(self)
 
         self.logger.info("Waiting for brokers to register at ZK")
@@ -183,6 +185,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
                 topic_cfg["topic"] = topic
                 self.create_topic(topic_cfg)
 
+    def _ensure_zk_chroot(self):
+        self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
+        if self.zk_chroot:
+            if not self.zk_chroot.startswith('/'):
+                raise Exception("Zookeeper chroot must start with '/' but found " + self.zk_chroot)
+
+            parts = self.zk_chroot.split('/')[1:]
+            for i in range(len(parts)):
+                self.zk.create('/' + '/'.join(parts[:i+1]))
+
     def set_protocol_and_port(self, node):
         listeners = []
         advertised_listeners = []
diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py
index b181a12..5bda867 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -103,7 +103,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         idx = self.idx(node)
         self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
         node.account.kill_java_processes(self.java_class_name(), allow_fail=False)
-        node.account.kill_java_processes(self.java_query_class_name(), allow_fail=False)
+        node.account.kill_java_processes(self.java_cli_class_name(), allow_fail=False)
         wait_until(lambda: not self.alive(node), timeout_sec=5, err_msg="Timed out waiting for zookeeper to stop.")
 
     def clean_node(self, node):
@@ -113,7 +113,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                              (self.__class__.__name__, node.account))
         node.account.kill_java_processes(self.java_class_name(),
                                          clean_shutdown=False, allow_fail=True)
-        node.account.kill_java_processes(self.java_query_class_name(),
+        node.account.kill_java_processes(self.java_cli_class_name(),
                                          clean_shutdown=False, allow_fail=False)
         node.account.ssh("rm -rf -- %s" % ZookeeperService.ROOT, allow_fail=False)
 
@@ -134,18 +134,21 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                        (self.path.script("zookeeper-security-migration.sh", node), zk_acl, self.connect_setting())
         node.account.ssh(la_migra_cmd)
 
+    def _check_chroot(self, chroot):
+        if chroot and not chroot.startswith("/"):
+            raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
+
     def query(self, path, chroot=None):
         """
         Queries zookeeper for data associated with 'path' and returns all fields in the schema
         """
-        if chroot and not chroot.startswith("/"):
-            raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
+        self._check_chroot(chroot)
 
         chroot_path = ('' if chroot is None else chroot) + path
 
         kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
         cmd = "%s %s -server %s get %s" % \
-              (kafka_run_class, self.java_query_class_name(), self.connect_setting(), chroot_path)
+              (kafka_run_class, self.java_cli_class_name(), self.connect_setting(), chroot_path)
         self.logger.debug(cmd)
 
         node = self.nodes[0]
@@ -158,10 +161,25 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
                     result = match.groups()[0]
         return result
 
+    def create(self, path, chroot=None):
+        """
+        Create an znode at the given path
+        """
+        self._check_chroot(chroot)
+
+        chroot_path = ('' if chroot is None else chroot) + path
+
+        kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
+        cmd = "%s %s -server %s create %s ''" % \
+              (kafka_run_class, self.java_cli_class_name(), self.connect_setting(), chroot_path)
+        self.logger.debug(cmd)
+        output = self.nodes[0].account.ssh_output(cmd)
+        self.logger.debug(output)
+
     def java_class_name(self):
         """ The class name of the Zookeeper quorum peers. """
         return "org.apache.zookeeper.server.quorum.QuorumPeerMain"
 
-    def java_query_class_name(self):
+    def java_cli_class_name(self):
         """ The class name of the Zookeeper tool within Kafka. """
         return "kafka.tools.ZooKeeperMainWrapper"

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.