You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/06 02:09:22 UTC

[15/21] kafka git commit: KAFKA-3464: Add system tests for Connect with Kafka security enabled

KAFKA-3464: Add system tests for Connect with Kafka security enabled

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Ismael Juma, Gwen Shapira

Closes #1141 from ewencp/kafka-3464-connect-security-system-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98978139
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98978139
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98978139

Branch: refs/heads/0.10.0
Commit: 9897813957c1b5cd70a0dd02094f184f73e06979
Parents: 80ba01e
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Mon Apr 4 18:49:29 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/connect.py             |  6 ++++
 tests/kafkatest/services/mirror_maker.py        |  1 -
 .../services/security/security_config.py        | 24 ++++++++-----
 .../tests/connect/connect_distributed_test.py   | 36 +++++++++++++++----
 tests/kafkatest/tests/connect/connect_test.py   | 37 +++++++++++++++-----
 .../templates/connect-distributed.properties    |  7 ++--
 .../templates/connect-standalone.properties     |  4 ++-
 7 files changed, 86 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 76336e1..51dade3 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -48,6 +48,7 @@ class ConnectServiceBase(Service):
     def __init__(self, context, num_nodes, kafka, files):
         super(ConnectServiceBase, self).__init__(context, num_nodes)
         self.kafka = kafka
+        self.security_config = kafka.security_config.client_config()
         self.files = files
 
     def pids(self, node):
@@ -89,6 +90,7 @@ class ConnectServiceBase(Service):
 
     def clean_node(self, node):
         node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
+        self.security_config.clean_node(node)
         node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False)
 
     def config_filenames(self):
@@ -153,6 +155,7 @@ class ConnectStandaloneService(ConnectServiceBase):
 
     def start_cmd(self, node, connector_configs):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
+        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
         cmd += "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
         cmd += " ".join(connector_configs)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
@@ -161,6 +164,7 @@ class ConnectStandaloneService(ConnectServiceBase):
     def start_node(self, node):
         node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
 
+        self.security_config.setup_node(node)
         node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
         remote_connector_configs = []
@@ -190,6 +194,7 @@ class ConnectDistributedService(ConnectServiceBase):
 
     def start_cmd(self, node):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
+        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
         cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
         return cmd
@@ -197,6 +202,7 @@ class ConnectDistributedService(ConnectServiceBase):
     def start_node(self, node):
         node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
 
+        self.security_config.setup_node(node)
         node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
         if self.connector_config_templates:

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 4386788..cb4b2c1 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -1,4 +1,3 @@
-
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index b5efba8..1bbabd2 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -17,6 +17,7 @@ import os
 import subprocess
 from ducktape.template import TemplateRenderer
 from kafkatest.services.security.minikdc import MiniKdc
+import itertools
 
 class Keytool(object):
 
@@ -172,17 +173,22 @@ class SecurityConfig(TemplateRenderer):
         else:
             return ""
 
-    def __str__(self):
+    def props(self, prefix=''):
         """
-        Return properties as string with line separators.
+        Return properties as string with line separators, optionally with a prefix.
         This is used to append security config properties to
         a properties file.
+        :param prefix: prefix to add to each property
+        :return: a string containing line-separated properties
         """
+        if self.security_protocol == SecurityConfig.PLAINTEXT:
+            return ""
+        config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems())
+        # Extra blank lines ensure this can be appended/prepended safely
+        return "\n".join(itertools.chain([""], config_lines, [""]))
 
-        prop_str = ""
-        if self.security_protocol != SecurityConfig.PLAINTEXT:
-            for key, value in self.properties.items():
-                prop_str += ("\n" + key + "=" + value)
-            prop_str += "\n"
-        return prop_str
-
+    def __str__(self):
+        """
+        Return properties as a string with line separators.
+        """
+        return self.props()

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 9aa16ab..698a827 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -13,15 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.tests.kafka_test import KafkaTest
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
 from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
 from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
 from ducktape.utils.util import wait_until
 from ducktape.mark import matrix
 import subprocess, itertools, time
 from collections import Counter
 
-class ConnectDistributedTest(KafkaTest):
+class ConnectDistributedTest(Test):
     """
     Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
     another, validating the total output is identical to the input.
@@ -45,22 +49,39 @@ class ConnectDistributedTest(KafkaTest):
     SCHEMA = { "type": "string", "optional": False }
 
     def __init__(self, test_context):
-        super(ConnectDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+        super(ConnectDistributedTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.topics = {
             'test' : { 'partitions': 1, 'replication-factor': 1 }
-        })
+        }
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
 
-        self.cc = ConnectDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
-        self.cc.log_level = "DEBUG"
         self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
 
-    def test_file_source_and_sink(self):
+    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT):
+        self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
+                                  security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
+                                  topics=self.topics)
+
+        self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
+        self.cc.log_level = "DEBUG"
+
+        self.zk.start()
+        self.kafka.start()
+
+
+    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
+    def test_file_source_and_sink(self, security_protocol):
         """
         Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
         correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
         """
 
+        self.setup_services(security_protocol=security_protocol)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
 
         self.cc.start()
@@ -94,6 +115,7 @@ class ConnectDistributedTest(KafkaTest):
         """
         num_tasks = 3
 
+        self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 90f219a..7b57402 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -13,14 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.tests.kafka_test import KafkaTest
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
 from kafkatest.services.connect import ConnectStandaloneService
 from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
 from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize
+from ducktape.mark import parametrize, matrix
 import hashlib, subprocess, json
 
-class ConnectStandaloneFileTest(KafkaTest):
+class ConnectStandaloneFileTest(Test):
     """
     Simple test of Kafka Connect that produces data from a file in one
     standalone process and consumes it on another, validating the output is
@@ -42,24 +46,39 @@ class ConnectStandaloneFileTest(KafkaTest):
     SCHEMA = { "type": "string", "optional": False }
 
     def __init__(self, test_context):
-        super(ConnectStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+        super(ConnectStandaloneFileTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.topics = {
             'test' : { 'partitions': 1, 'replication-factor': 1 }
-        })
+        }
 
-        self.source = ConnectStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
-        self.sink = ConnectStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
-        self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000)
+        self.zk = ZookeeperService(test_context, self.num_zk)
 
     @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
     @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False)
     @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
-    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True):
+    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
+    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'):
         assert converter != None, "converter type must be set"
         # Template parameters
         self.key_converter = converter
         self.value_converter = converter
         self.schemas = schemas
 
+        self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
+                                  security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
+                                  topics=self.topics)
+
+        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
+        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+        self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC,
+                                                  consumer_timeout_ms=1000, new_consumer=True)
+
+
+        self.zk.start()
+        self.kafka.start()
+
         self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")])
         self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")])
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index 7a7440a..48f5f78 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -13,7 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-bootstrap.servers={{ kafka.bootstrap_servers() }}
+bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
+{{ kafka.security_config.client_config().props() }}
+{{ kafka.security_config.client_config().props("producer.") }}
+{{ kafka.security_config.client_config().props("consumer.") }}
 
 group.id={{ group|default("connect-cluster") }}
 
@@ -43,4 +46,4 @@ rest.advertised.host.name = {{ node.account.hostname }}
 
 # Reduce session timeouts so tests that kill workers don't need to wait as long to recover
 session.timeout.ms=10000
-consumer.session.timeout.ms=10000
\ No newline at end of file
+consumer.session.timeout.ms=10000

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/templates/connect-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index bf1daf7..09c6487 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -13,7 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-bootstrap.servers={{ kafka.bootstrap_servers() }}
+bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
+{{ kafka.security_config.client_config().props("producer.") }}
+{{ kafka.security_config.client_config().props("consumer.") }}
 
 key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
 value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}